entelix-agents 0.5.4

entelix production agent runtime — ReAct / Supervisor / Hierarchical / Chat recipes, tool-side layer ecosystem (approval / event / hook), sink adapters (broadcast / capture / channel / dropping / fail-open / fan-out / state-erasure), chat-shape state helpers
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
//! `ApprovalLayer` — `tower::Layer<S>` that gates every
//! `Service<ToolInvocation>` dispatch through an [`Approver::decide`]
//! call. On approval the inner service runs; on rejection the layer
//! short-circuits with [`Error::InvalidRequest`] carrying the
//! approver's reason. When a [`ToolApprovalEventSinkHandle`] is
//! attached to the request's [`ExecutionContext`] (via the agent's
//! standard wiring), the layer also emits
//! [`crate::agent::AgentEvent::ToolCallApproved`] /
//! [`crate::agent::AgentEvent::ToolCallDenied`] for observability.
//!
//! ## Wiring
//!
//! Operators rarely attach the layer manually — `ReActAgentBuilder`
//! auto-wires it when an `Approver` is configured. The wiring routes:
//!
//! 1. `ReActAgentBuilder::with_approver(approver)` →
//! 2. `build()` calls `tools.layer(ApprovalLayer::new(approver))` →
//! 3. Every `tools.dispatch(...)` inside the agent's graph passes
//!    through the layer →
//! 4. `Approver::decide` runs before the inner tool service →
//! 5. `Agent::execute_inner` attaches a `ToolApprovalEventSinkHandle`
//!    to the request `ExecutionContext` so the layer can emit through
//!    the agent's typed `AgentEventSink<S>` without taking it as a
//!    constructor argument (which would tie the layer to a specific
//!    `S`).
//!
//! ## Type erasure across the sink
//!
//! `AgentEventSink<S>` is generic over the agent's state type;
//! `ApprovalLayer` lives below the agent (one layer instance, many
//! `S` shapes if the same registry feeds heterogeneous agents).
//! [`ToolApprovalEventSink`] is the type-erased trait the layer
//! actually consumes — [`ToolApprovalEventSinkHandle::for_agent_sink`]
//! is the bridge that adapts any `Arc<dyn AgentEventSink<S>>` into
//! the type-erased shape.
//!
//! ## `AwaitExternal` pause-and-resume
//!
//! When `Approver::decide` returns `ApprovalDecision::AwaitExternal`,
//! the layer raises `Error::Interrupted` with
//! [`InterruptionKind::ApprovalPending { tool_use_id }`](entelix_core::interruption::InterruptionKind::ApprovalPending)
//! and a payload carrying the pending dispatch's `run_id` / `tool` /
//! `input` for operator-side audit. The graph dispatch loop catches
//! it, persists a checkpoint with pre-node state, and surfaces the
//! typed error to the caller — the agent run pauses cleanly with no
//! inflight resources.
//!
//! Resume drops the operator's eventual decision into
//! the typed `Command::ApproveTool { tool_use_id, decision }`
//! the overrides to `ExecutionContext` before re-entering the same
//! dispatch. The layer's override-lookup runs first and short-
//! circuits the approver — the resumed run completes the pending
//! tool call without re-asking.
//!
//! ## What the layer does NOT cover
//!
//! - **Per-tool approver bypasses.** Operators that want to skip
//!   approval for a subset of tools wire a custom `Approver` impl
//!   that returns `Approve` for those names; the layer itself stays
//!   unconditional.

use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use futures::future::BoxFuture;
use serde_json::{Value, json};
use tower::{Layer, Service};

use entelix_core::PendingApprovalDecisions;
use entelix_core::TenantId;
use entelix_core::error::{Error, Result};
use entelix_core::interruption::InterruptionKind;
use entelix_core::service::ToolInvocation;
use entelix_core::tools::ToolEffect;

use crate::agent::approver::{ApprovalDecision, ApprovalRequest, Approver};
use crate::agent::event::AgentEvent;
use crate::agent::sink::AgentEventSink;

/// Type-erased sink for tool-approval events. The agent runtime
/// produces an implementation by adapting its
/// `Arc<dyn AgentEventSink<S>>` (see
/// [`ToolApprovalEventSinkHandle::for_agent_sink`]); operators
/// implementing custom downstream observability (OTel direct,
/// audit-log direct) can implement this trait directly without
/// going through `AgentEventSink<S>`.
#[async_trait]
pub trait ToolApprovalEventSink: Send + Sync + 'static {
    /// Record an approval decision. The layer awaits the call so
    /// the approval marker fires *before* the inner tool service
    /// begins; observability ordering matches the operator's mental
    /// model (approve → start → complete).
    ///
    /// `tenant_id` is the scope of the originating
    /// [`ToolInvocation`]'s [`entelix_core::ExecutionContext`]; the
    /// `AgentEventSink<S>` adapter stamps it onto the emitted
    /// [`AgentEvent::ToolCallApproved`] so audit/billing/replay
    /// consumers read the same tenant scope as every other event in
    /// the run.
    async fn record_approved(
        &self,
        tenant_id: &TenantId,
        run_id: &str,
        tool_use_id: &str,
        tool: &str,
    );

    /// Record a denial decision. The layer awaits and then returns
    /// `Error::InvalidRequest` to the caller; the matching
    /// `ToolStart` does NOT fire. `tenant_id` mirrors `record_approved`.
    async fn record_denied(
        &self,
        tenant_id: &TenantId,
        run_id: &str,
        tool_use_id: &str,
        tool: &str,
        reason: &str,
    );
}

/// Refcounted handle for [`ToolApprovalEventSink`]. Stored in
/// [`entelix_core::ExecutionContext`] extensions so [`ApprovalLayer`] finds the
/// sink without taking it as a constructor argument.
///
/// `Clone` is cheap (the inner sink rides behind `Arc`).
#[derive(Clone)]
pub struct ToolApprovalEventSinkHandle {
    sink: Arc<dyn ToolApprovalEventSink>,
}

impl ToolApprovalEventSinkHandle {
    /// Wrap any [`ToolApprovalEventSink`] impl. Convenient for
    /// custom direct-observability sinks that don't bridge through
    /// `AgentEventSink<S>`.
    pub fn new<E>(sink: E) -> Self
    where
        E: ToolApprovalEventSink,
    {
        Self {
            sink: Arc::new(sink),
        }
    }

    /// Adapt an agent's typed [`AgentEventSink<S>`] into the type-
    /// erased shape the layer consumes. The adapter forwards
    /// `record_approved` → [`AgentEvent::ToolCallApproved`] and
    /// `record_denied` → [`AgentEvent::ToolCallDenied`] on the
    /// underlying sink.
    pub fn for_agent_sink<S>(sink: Arc<dyn AgentEventSink<S>>) -> Self
    where
        S: Clone + Send + Sync + 'static,
    {
        Self {
            sink: Arc::new(SinkAdapter { sink }),
        }
    }

    /// Borrow the underlying erased sink. Primarily for the layer's
    /// own dispatch path; operators consume the methods through the
    /// trait object the layer reads from `ExecutionContext`.
    pub fn inner(&self) -> &Arc<dyn ToolApprovalEventSink> {
        &self.sink
    }
}

struct SinkAdapter<S> {
    sink: Arc<dyn AgentEventSink<S>>,
}

#[async_trait]
impl<S> ToolApprovalEventSink for SinkAdapter<S>
where
    S: Clone + Send + Sync + 'static,
{
    async fn record_approved(
        &self,
        tenant_id: &TenantId,
        run_id: &str,
        tool_use_id: &str,
        tool: &str,
    ) {
        let event: AgentEvent<S> = AgentEvent::ToolCallApproved {
            run_id: run_id.to_owned(),
            tenant_id: tenant_id.clone(),
            tool_use_id: tool_use_id.to_owned(),
            tool: tool.to_owned(),
        };
        let _ = self.sink.send(event).await;
    }

    async fn record_denied(
        &self,
        tenant_id: &TenantId,
        run_id: &str,
        tool_use_id: &str,
        tool: &str,
        reason: &str,
    ) {
        let event: AgentEvent<S> = AgentEvent::ToolCallDenied {
            run_id: run_id.to_owned(),
            tenant_id: tenant_id.clone(),
            tool_use_id: tool_use_id.to_owned(),
            tool: tool.to_owned(),
            reason: reason.to_owned(),
        };
        let _ = self.sink.send(event).await;
    }
}

/// Selector for which tool dispatches the [`ApprovalLayer`] gates
/// through the [`Approver`]. Routes by the calling tool's
/// [`ToolMetadata::effect`](entelix_core::tools::ToolMetadata) so
/// operators express *intent* once at metadata time and the layer
/// honours it without per-tool wiring.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
#[non_exhaustive]
pub enum EffectGate {
    /// Every dispatch reaches the approver — the original behaviour
    /// (and the safe default for deployments where every tool call
    /// must have an explicit go-ahead).
    #[default]
    Always,
    /// Only [`ToolEffect::Destructive`] tools reach the approver.
    /// `ReadOnly` and `Mutating` calls auto-approve. The narrowest
    /// gate that still requires confirmation for irreversible
    /// operations — typical "agent has free rein except for the
    /// dangerous handful" deployments.
    DestructiveOnly,
    /// Both [`ToolEffect::Mutating`] and [`ToolEffect::Destructive`]
    /// reach the approver. `ReadOnly` calls auto-approve.
    /// Appropriate when even reversible writes need a human in the
    /// loop (regulated workloads, compliance-bound flows).
    MutatingAndAbove,
}

impl EffectGate {
    /// Whether a tool with the given effect should be gated by the
    /// approver under this policy.
    #[must_use]
    pub const fn requires_approval(self, effect: ToolEffect) -> bool {
        match self {
            Self::Always => true,
            Self::DestructiveOnly => matches!(effect, ToolEffect::Destructive),
            Self::MutatingAndAbove => {
                matches!(effect, ToolEffect::Mutating | ToolEffect::Destructive)
            }
        }
    }
}

/// `tower::Layer<S>` that gates a `Service<ToolInvocation, Response = Value, Error = Error>`
/// through an [`Approver`]. Construct via [`ApprovalLayer::new`];
/// attach to a `ToolRegistry` via
/// [`entelix_core::tools::ToolRegistry::layer`].
pub struct ApprovalLayer {
    approver: Arc<dyn Approver>,
    gate: EffectGate,
}

impl ApprovalLayer {
    /// Patch-version-stable identifier surfaced through
    /// [`entelix_core::tools::ToolRegistry::layer_names`]. Renaming
    /// this constant is a breaking change for dashboards keyed off
    /// the value.
    pub const NAME: &'static str = "tool_approval";

    /// Wrap an `Arc<dyn Approver>` for layer attachment with the
    /// default [`EffectGate::Always`] policy — every dispatch
    /// reaches the approver. Cloning the layer bumps the inner
    /// refcount.
    pub fn new(approver: Arc<dyn Approver>) -> Self {
        Self {
            approver,
            gate: EffectGate::default(),
        }
    }

    /// Narrow the gate so only tools matching the supplied
    /// [`EffectGate`] reach the approver. Tools whose effect falls
    /// outside the gate auto-approve through the inner service
    /// without consulting the approver — operators express
    /// "approve everything destructive, autopilot the rest"
    /// declaratively at metadata time.
    #[must_use]
    pub const fn with_effect_gate(mut self, gate: EffectGate) -> Self {
        self.gate = gate;
        self
    }
}

impl Clone for ApprovalLayer {
    fn clone(&self) -> Self {
        Self {
            approver: Arc::clone(&self.approver),
            gate: self.gate.clone(),
        }
    }
}

impl<S> Layer<S> for ApprovalLayer {
    type Service = ApprovalService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        ApprovalService {
            inner,
            approver: Arc::clone(&self.approver),
            gate: self.gate.clone(),
        }
    }
}

impl entelix_core::NamedLayer for ApprovalLayer {
    fn layer_name(&self) -> &'static str {
        Self::NAME
    }
}

/// `tower::Service<ToolInvocation>` produced by [`ApprovalLayer`].
/// Public so operators that wire dispatch paths manually can
/// compose it directly.
pub struct ApprovalService<S> {
    inner: S,
    approver: Arc<dyn Approver>,
    gate: EffectGate,
}

impl<S: Clone> Clone for ApprovalService<S> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            approver: Arc::clone(&self.approver),
            gate: self.gate.clone(),
        }
    }
}

impl<S> Service<ToolInvocation> for ApprovalService<S>
where
    S: Service<ToolInvocation, Response = Value, Error = Error> + Clone + Send + 'static,
    S::Future: Send + 'static,
{
    type Response = Value;
    type Error = Error;
    type Future = BoxFuture<'static, Result<Value>>;

    #[inline]
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, invocation: ToolInvocation) -> Self::Future {
        let approver = Arc::clone(&self.approver);
        let gate = self.gate.clone();
        let mut inner = self.inner.clone();
        Box::pin(async move {
            // Effect-gate short-circuit — tools whose effect lies
            // outside the configured gate skip the approver entirely
            // and auto-approve at the inner service. The gate is
            // applied AFTER the override lookup so an explicit
            // pending decision (resume path) still takes precedence
            // over the auto-approval — operators that paused on a
            // gated tool see their decision honoured even after a
            // policy change narrows the gate mid-flight.
            let override_decision = invocation
                .ctx
                .extension::<PendingApprovalDecisions>()
                .and_then(|o| o.get(&invocation.tool_use_id).cloned());
            if override_decision.is_none() && !gate.requires_approval(invocation.metadata.effect) {
                return inner.call(invocation).await;
            }
            let decision = if let Some(d) = override_decision {
                d
            } else {
                let request = ApprovalRequest::new(
                    invocation.tool_use_id.clone(),
                    invocation.metadata.name.clone(),
                    invocation.input.clone(),
                );
                approver.decide(&request, &invocation.ctx).await?
            };

            let sink = invocation.ctx.extension::<ToolApprovalEventSinkHandle>();
            let tenant_id = invocation.ctx.tenant_id().clone();
            let run_id = invocation.ctx.run_id().unwrap_or("").to_owned();
            let tool_use_id = invocation.tool_use_id.clone();
            let tool_name = invocation.metadata.name.clone();
            let input = invocation.input.clone();

            match decision {
                ApprovalDecision::Approve => {
                    if let Some(handle) = sink.as_deref() {
                        handle
                            .inner()
                            .record_approved(&tenant_id, &run_id, &tool_use_id, &tool_name)
                            .await;
                    }
                    inner.call(invocation).await
                }
                ApprovalDecision::Reject { reason } => {
                    if let Some(handle) = sink.as_deref() {
                        handle
                            .inner()
                            .record_denied(&tenant_id, &run_id, &tool_use_id, &tool_name, &reason)
                            .await;
                    }
                    Err(Error::invalid_request(format!(
                        "approver rejected tool '{tool_name}' dispatch: {reason}"
                    )))
                }
                ApprovalDecision::AwaitExternal => {
                    // Pause the agent via graph interrupt. The
                    // payload identifies the pending approval so
                    // the operator can match it against an out-of-
                    // band review queue. Resume via
                    // `Command::ApproveTool { tool_use_id, decision }`
                    // re-enters the same dispatch with the operator's
                    // decision attached to ctx — the override-lookup
                    // branch above short-circuits without re-asking
                    // the approver.
                    Err(Error::Interrupted {
                        kind: InterruptionKind::ApprovalPending {
                            tool_use_id: tool_use_id.clone(),
                        },
                        payload: json!({
                            "run_id": run_id,
                            "tool_use_id": tool_use_id,
                            "tool": tool_name,
                            "input": input,
                        }),
                    })
                }
                // `ApprovalDecision` is `#[non_exhaustive]` — surface
                // any future variant the layer doesn't yet wire as a
                // typed configuration error rather than a silent
                // dispatch.
                _ => Err(Error::config(format!(
                    "ApprovalLayer received an unsupported `ApprovalDecision` variant for tool '{tool_name}'; \
                     update the layer to handle the new variant"
                ))),
            }
        })
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
    use std::sync::atomic::{AtomicUsize, Ordering};

    use entelix_core::AgentContext;
    use entelix_core::ExecutionContext;
    use entelix_core::tools::{Tool, ToolMetadata, ToolRegistry};
    use serde_json::json;

    use super::*;
    use crate::agent::approver::{AlwaysApprove, ApprovalDecision, ApprovalRequest};

    struct EchoTool {
        metadata: ToolMetadata,
    }

    impl EchoTool {
        fn new() -> Self {
            Self {
                metadata: ToolMetadata::function(
                    "echo",
                    "Echo input verbatim.",
                    json!({ "type": "object" }),
                ),
            }
        }
    }

    #[async_trait]
    impl Tool for EchoTool {
        fn metadata(&self) -> &ToolMetadata {
            &self.metadata
        }

        async fn execute(&self, input: Value, _ctx: &AgentContext<()>) -> Result<Value> {
            Ok(input)
        }
    }

    struct AlwaysReject {
        reason: String,
    }

    #[async_trait]
    impl Approver for AlwaysReject {
        async fn decide(
            &self,
            _request: &ApprovalRequest,
            _ctx: &ExecutionContext,
        ) -> Result<ApprovalDecision> {
            Ok(ApprovalDecision::Reject {
                reason: self.reason.clone(),
            })
        }
    }

    struct CountingApprovalSink {
        approved: Arc<AtomicUsize>,
        denied: Arc<AtomicUsize>,
    }

    #[async_trait]
    impl ToolApprovalEventSink for CountingApprovalSink {
        async fn record_approved(
            &self,
            _tenant_id: &TenantId,
            _run_id: &str,
            _tool_use_id: &str,
            _tool: &str,
        ) {
            self.approved.fetch_add(1, Ordering::SeqCst);
        }
        async fn record_denied(
            &self,
            _tenant_id: &TenantId,
            _run_id: &str,
            _tool_use_id: &str,
            _tool: &str,
            _reason: &str,
        ) {
            self.denied.fetch_add(1, Ordering::SeqCst);
        }
    }

    #[tokio::test]
    async fn approver_approve_dispatches_inner_tool() {
        let approver: Arc<dyn Approver> = Arc::new(AlwaysApprove);
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let ctx = ExecutionContext::new();
        let result = registry
            .dispatch("", "echo", json!({"x": 1}), &ctx)
            .await
            .unwrap();
        assert_eq!(result, json!({"x": 1}));
    }

    #[tokio::test]
    async fn approver_reject_short_circuits_dispatch() {
        let approver: Arc<dyn Approver> = Arc::new(AlwaysReject {
            reason: "policy violation".to_owned(),
        });
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let ctx = ExecutionContext::new();
        let err = registry
            .dispatch("", "echo", json!({"x": 1}), &ctx)
            .await
            .unwrap_err();
        match err {
            Error::InvalidRequest(msg) => {
                assert!(msg.contains("approver rejected tool 'echo'"), "got: {msg}");
                assert!(msg.contains("policy violation"), "got: {msg}");
            }
            other => panic!("expected InvalidRequest, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn approval_sink_records_both_decisions() {
        let approved = Arc::new(AtomicUsize::new(0));
        let denied = Arc::new(AtomicUsize::new(0));
        let sink = CountingApprovalSink {
            approved: Arc::clone(&approved),
            denied: Arc::clone(&denied),
        };
        let handle = ToolApprovalEventSinkHandle::new(sink);
        let ctx = ExecutionContext::new().add_extension(handle);

        // First dispatch — approver allows.
        let approver_ok: Arc<dyn Approver> = Arc::new(AlwaysApprove);
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver_ok))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        registry
            .dispatch("", "echo", json!({"x": 1}), &ctx)
            .await
            .unwrap();
        assert_eq!(approved.load(Ordering::SeqCst), 1);
        assert_eq!(denied.load(Ordering::SeqCst), 0);

        // Second dispatch — approver rejects on a fresh registry.
        let approver_no: Arc<dyn Approver> = Arc::new(AlwaysReject {
            reason: "no".into(),
        });
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver_no))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let _ = registry.dispatch("", "echo", json!({"x": 1}), &ctx).await;
        assert_eq!(approved.load(Ordering::SeqCst), 1);
        assert_eq!(denied.load(Ordering::SeqCst), 1);
    }

    #[tokio::test]
    async fn approval_layer_runs_without_sink_attached() {
        // No sink in ctx — layer must still gate dispatch correctly.
        let approver: Arc<dyn Approver> = Arc::new(AlwaysApprove);
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let result = registry
            .dispatch("", "echo", json!({"x": 1}), &ExecutionContext::new())
            .await
            .unwrap();
        assert_eq!(result, json!({"x": 1}));
    }

    struct AlwaysAwait;

    #[async_trait]
    impl Approver for AlwaysAwait {
        async fn decide(
            &self,
            _request: &ApprovalRequest,
            _ctx: &ExecutionContext,
        ) -> Result<ApprovalDecision> {
            Ok(ApprovalDecision::AwaitExternal)
        }
    }

    #[tokio::test]
    async fn await_external_raises_interrupted_with_payload() {
        // The pause-and-resume contract: AwaitExternal must surface
        // as `Error::Interrupted` so the graph dispatch loop can
        // checkpoint pre-state and bubble the typed error to the
        // caller. The payload identifies the pending dispatch so
        // the operator can route it to an out-of-band review queue.
        let approver: Arc<dyn Approver> = Arc::new(AlwaysAwait);
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let err = registry
            .dispatch("tu-1", "echo", json!({"x": 1}), &ExecutionContext::new())
            .await
            .unwrap_err();
        match err {
            Error::Interrupted { kind, payload } => {
                assert_eq!(
                    kind,
                    InterruptionKind::ApprovalPending {
                        tool_use_id: "tu-1".into()
                    }
                );
                assert_eq!(payload["tool_use_id"].as_str(), Some("tu-1"));
                assert_eq!(payload["tool"].as_str(), Some("echo"));
                assert_eq!(payload["input"], json!({"x": 1}));
            }
            other => panic!("expected Interrupted, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn approval_decision_overrides_short_circuit_approver() {
        // Resume path simulation: the approver still says "await",
        // but the operator has attached a decision override for
        // this tool_use_id (mimicking what `agent.resume_with(...)`
        // will do once the resume API ships). The layer must use
        // the override and skip the approver.
        let approver: Arc<dyn Approver> = Arc::new(AlwaysAwait);
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let overrides = {
            let mut p = PendingApprovalDecisions::new();
            p.insert("tu-1", ApprovalDecision::Approve);
            p
        };
        let ctx = ExecutionContext::new().add_extension(overrides);

        let result = registry
            .dispatch("tu-1", "echo", json!({"x": 1}), &ctx)
            .await
            .unwrap();
        assert_eq!(result, json!({"x": 1}));
    }

    #[tokio::test]
    async fn approval_decision_overrides_propagate_reject_decision() {
        // Operator's out-of-band decision was Reject — the override
        // must propagate that reject through the same code path so
        // the resume produces a typed rejection rather than a
        // re-fired AwaitExternal.
        let approver: Arc<dyn Approver> = Arc::new(AlwaysAwait);
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let mut overrides = PendingApprovalDecisions::new();
        overrides.insert(
            "tu-1",
            ApprovalDecision::Reject {
                reason: "operator declined out-of-band".to_owned(),
            },
        );
        let ctx = ExecutionContext::new().add_extension(overrides);

        let err = registry
            .dispatch("tu-1", "echo", json!({"x": 1}), &ctx)
            .await
            .unwrap_err();
        match err {
            Error::InvalidRequest(msg) => {
                assert!(
                    msg.contains("operator declined out-of-band"),
                    "expected override reason, got: {msg}"
                );
            }
            other => panic!("expected InvalidRequest from override, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn approval_decision_overrides_only_apply_to_matching_tool_use_id() {
        // Override is registered for a different tool_use_id — the
        // current dispatch must fall through to the approver.
        let approver: Arc<dyn Approver> = Arc::new(AlwaysAwait);
        let registry = ToolRegistry::new()
            .layer(ApprovalLayer::new(approver))
            .register(Arc::new(EchoTool::new()))
            .unwrap();
        let mut overrides = PendingApprovalDecisions::new();
        overrides.insert("a-different-id", ApprovalDecision::Approve);
        let ctx = ExecutionContext::new().add_extension(overrides);

        let err = registry
            .dispatch("tu-1", "echo", json!({"x": 1}), &ctx)
            .await
            .unwrap_err();
        // Approver runs (no matching override), returns AwaitExternal,
        // layer raises Interrupted.
        assert!(matches!(err, Error::Interrupted { .. }));
    }

    #[tokio::test]
    async fn approval_layer_composes_under_outer_layer() {
        // Cross-layer integration: register `ScopedToolLayer`
        // INNERMOST and `ApprovalLayer` OUTSIDE it (i.e. register
        // ScopedToolLayer first so it ends up nearer the leaf
        // tool, then register ApprovalLayer so it wraps that).
        // The dispatch flow on Approve must be:
        //   ApprovalLayer.call → ScopedToolLayer.call → tool.execute
        // proven by the wrap-counter incrementing AFTER the
        // approver's decision is applied.
        use entelix_core::tools::{ScopedToolLayer, ToolDispatchScope};
        use futures::future::BoxFuture;

        struct ApproveAfterScope {
            scope_wraps: Arc<AtomicUsize>,
        }
        impl ToolDispatchScope for ApproveAfterScope {
            fn wrap(
                &self,
                _ctx: ExecutionContext,
                fut: BoxFuture<'static, Result<Value>>,
            ) -> BoxFuture<'static, Result<Value>> {
                self.scope_wraps.fetch_add(1, Ordering::SeqCst);
                fut
            }
        }

        let scope_wraps = Arc::new(AtomicUsize::new(0));
        let scope = ApproveAfterScope {
            scope_wraps: Arc::clone(&scope_wraps),
        };
        let approver: Arc<dyn Approver> = Arc::new(AlwaysApprove);
        let registry = ToolRegistry::new()
            .layer(ScopedToolLayer::new(scope)) // innermost (registered first)
            .layer(ApprovalLayer::new(approver)) // outermost (registered last)
            .register(Arc::new(EchoTool::new()))
            .unwrap();

        registry
            .dispatch("", "echo", json!({"x": 1}), &ExecutionContext::new())
            .await
            .unwrap();
        // Scope wrap fires once = ApprovalLayer approved + flowed
        // into the inner ScopedToolLayer + then the leaf tool.
        assert_eq!(scope_wraps.load(Ordering::SeqCst), 1);
    }

    #[tokio::test]
    async fn approval_reject_short_circuits_before_inner_scope() {
        // Mirror of the above for the Reject path: the inner
        // ScopedToolLayer must NOT fire when the outer
        // ApprovalLayer rejects. Verifies the layer ordering keeps
        // approval gating outside scope setup — important for
        // perf-sensitive scopes (e.g. Postgres SET LOCAL) that
        // operators don't want to pay for on rejected calls.
        use entelix_core::tools::{ScopedToolLayer, ToolDispatchScope};
        use futures::future::BoxFuture;

        struct CountScope {
            wraps: Arc<AtomicUsize>,
        }
        impl ToolDispatchScope for CountScope {
            fn wrap(
                &self,
                _ctx: ExecutionContext,
                fut: BoxFuture<'static, Result<Value>>,
            ) -> BoxFuture<'static, Result<Value>> {
                self.wraps.fetch_add(1, Ordering::SeqCst);
                fut
            }
        }

        let wraps = Arc::new(AtomicUsize::new(0));
        let scope = CountScope {
            wraps: Arc::clone(&wraps),
        };
        let approver: Arc<dyn Approver> = Arc::new(AlwaysReject {
            reason: "no".into(),
        });
        let registry = ToolRegistry::new()
            .layer(ScopedToolLayer::new(scope)) // innermost
            .layer(ApprovalLayer::new(approver)) // outermost
            .register(Arc::new(EchoTool::new()))
            .unwrap();

        let _ = registry
            .dispatch("", "echo", json!({"x": 1}), &ExecutionContext::new())
            .await;
        assert_eq!(
            wraps.load(Ordering::SeqCst),
            0,
            "scope wrap must not fire when the outer ApprovalLayer rejects"
        );
    }
}