Skip to main content

entelix_core/
context.rs

1//! Request-scope execution context.
2//!
3//! `ExecutionContext` is the only context object that flows through
4//! `Runnable::invoke`, `Tool::execute`, hooks, and codecs. It carries:
5//!
6//! - a `CancellationToken` (F3 mitigation — cooperative cancellation)
7//! - an optional deadline (`tokio::time::Instant`)
8//! - an optional `thread_id` — the durable conversation identifier used
9//!   by `Checkpointer` and `SessionGraph` to scope persistence.
10//! - a mandatory `tenant_id` — multi-tenant scope (invariant 11). Defaults
11//!   to [`crate::DEFAULT_TENANT_ID`] when not specified explicitly.
12//!
13//! It deliberately does **not** embed a `CredentialProvider` (invariant 10 —
14//! tokens never reach Tool input). Future fields will be added as the layers
15//! materialize: span context, cost meter handle. Each addition is reviewed
16//! against the no-tokens-in-tools rule.
17//!
18//! `ExecutionContext` is `Clone` (cheap) so combinators can fan it out to
19//! parallel branches without lifetime gymnastics.
20
21use std::sync::Arc;
22
23use tokio::time::Instant;
24
25use crate::audit::AuditSinkHandle;
26use crate::cancellation::CancellationToken;
27use crate::extensions::Extensions;
28use crate::tenant_id::TenantId;
29use crate::tools::{
30    CurrentToolInvocation, ToolProgress, ToolProgressSinkHandle, ToolProgressStatus,
31};
32
33/// Carrier for request-scope state that every `Runnable`, `Tool`,
34/// and codec sees.
35///
36/// Marked `#[non_exhaustive]`: fields are private; callers always go
37/// through [`Self::new`] and the `with_*` builder methods, so adding
38/// a new field is a non-breaking change.
39///
40/// `tenant_id` is a [`TenantId`] (validating `Arc<str>` newtype, see
41/// [`crate::tenant_id`]). Cloning the context — done implicitly per
42/// tool dispatch and per sub-agent — bumps the underlying refcount
43/// instead of allocating. The default-tenant `TenantId` is shared
44/// process-wide via a `OnceLock`, so a freshly-built context
45/// allocates zero strings on the hot path.
46#[derive(Clone, Debug)]
47#[non_exhaustive]
48pub struct ExecutionContext {
49    cancellation: CancellationToken,
50    deadline: Option<Instant>,
51    thread_id: Option<String>,
52    tenant_id: TenantId,
53    run_id: Option<String>,
54    /// Run id of the parent that dispatched the call this context
55    /// represents. `None` at the root; `Some(parent_run)` for every
56    /// sub-agent and tool-driven dispatch the parent originates.
57    /// LangSmith-style trace-tree consumers reconstruct the
58    /// hierarchy from `(run_id, parent_run_id)` edges.
59    parent_run_id: Option<String>,
60    /// Idempotency key for the current logical call — vendor-side
61    /// dedupe identifier shared across every retry attempt of the
62    /// same logical call. `RetryService` stamps it on first entry
63    /// when absent so two retries of one timed-out request do not
64    /// produce two charges (invariant #17 — vendor-authoritative
65    /// guarantees beat self-jitter).
66    idempotency_key: Option<Arc<str>>,
67    extensions: Extensions,
68}
69
70impl Default for ExecutionContext {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl ExecutionContext {
77    /// Create a fresh context with a new cancellation token, no deadline,
78    /// no thread ID, and the [`TenantId::default()`] tenant scope
79    /// (which is the process-wide shared `"default"` `TenantId`).
80    pub fn new() -> Self {
81        Self {
82            cancellation: CancellationToken::new(),
83            deadline: None,
84            thread_id: None,
85            tenant_id: TenantId::default(),
86            run_id: None,
87            parent_run_id: None,
88            idempotency_key: None,
89            extensions: Extensions::new(),
90        }
91    }
92
93    /// Create a context bound to an existing cancellation token (e.g. a parent
94    /// agent's token, so cancelling the parent cascades to children).
95    pub fn with_cancellation(cancellation: CancellationToken) -> Self {
96        Self {
97            cancellation,
98            deadline: None,
99            thread_id: None,
100            tenant_id: TenantId::default(),
101            run_id: None,
102            parent_run_id: None,
103            idempotency_key: None,
104            extensions: Extensions::new(),
105        }
106    }
107
108    /// Attach a deadline. Returns `self` for builder-style chaining.
109    #[must_use]
110    pub const fn with_deadline(mut self, deadline: Instant) -> Self {
111        self.deadline = Some(deadline);
112        self
113    }
114
115    /// Attach a `thread_id` — the durable conversation key used by
116    /// `Checkpointer`s to scope persistence.
117    #[must_use]
118    pub fn with_thread_id(mut self, thread_id: impl Into<String>) -> Self {
119        self.thread_id = Some(thread_id.into());
120        self
121    }
122
123    /// Override the tenant scope. Multi-tenant operators set this per
124    /// request; single-tenant deployments leave it at
125    /// [`TenantId::default()`] (invariant 11).
126    #[must_use]
127    pub fn with_tenant_id(mut self, tenant_id: TenantId) -> Self {
128        self.tenant_id = tenant_id;
129        self
130    }
131
132    /// Attach a `run_id` — the per-execute correlation key the agent
133    /// runtime stamps on every `AgentEvent`, OTel span, and tool
134    /// invocation event. `Agent::execute` generates a fresh UUID v7
135    /// on entry when none is set; recipes pre-allocate it only when
136    /// they need to correlate the run with an external reservation
137    /// (a workflow id minted by the caller, a database row id chosen
138    /// before dispatch). Sub-agents do not inherit the parent's
139    /// `run_id` — they mint their own and the parent's flows
140    /// through [`Self::with_parent_run_id`] for trace-tree
141    /// reconstruction.
142    #[must_use]
143    pub fn with_run_id(mut self, run_id: impl Into<String>) -> Self {
144        self.run_id = Some(run_id.into());
145        self
146    }
147
148    /// Attach a `parent_run_id` — the run id of the calling agent
149    /// when this context represents a sub-agent dispatch. The
150    /// runtime sets this automatically when a parent `Agent::execute`
151    /// recurses into a child via [`crate::AgentContext`]; recipes
152    /// rarely call this directly. LangSmith-style trace-tree
153    /// consumers reconstruct hierarchy from `(run_id, parent_run_id)`
154    /// edges across `AgentEvent::Started`.
155    #[must_use]
156    pub fn with_parent_run_id(mut self, parent_run_id: impl Into<String>) -> Self {
157        self.parent_run_id = Some(parent_run_id.into());
158        self
159    }
160
161    /// Attach a [`crate::RunBudget`] — six-axis usage cap shared
162    /// across the run (parent agent + every sub-agent it
163    /// dispatches). Cloning the context bumps the budget's
164    /// internal `Arc` refcount so sub-agent dispatches accumulate
165    /// into the same counters.
166    ///
167    /// Stored in [`Extensions`] under `RunBudget`'s `TypeId`, so
168    /// a second call replaces the prior budget. Read sites
169    /// reach for it via [`Self::run_budget`]; absent budget
170    /// makes every dispatch site's check / observe a no-op.
171    #[must_use]
172    pub fn with_run_budget(self, budget: crate::RunBudget) -> Self {
173        self.add_extension(budget)
174    }
175
176    /// Borrow the [`crate::RunBudget`] attached via
177    /// [`Self::with_run_budget`], if any. Dispatch sites
178    /// (`ChatModel::complete_full` / `complete_typed` /
179    /// `stream_deltas`, `ToolRegistry::dispatch_*`) gate budget
180    /// checks on `Some(_)` so a context without a budget incurs
181    /// zero overhead beyond the `TypeId` lookup.
182    #[must_use]
183    pub fn run_budget(&self) -> Option<std::sync::Arc<crate::RunBudget>> {
184        self.extension::<crate::RunBudget>()
185    }
186
187    /// Attach an idempotency key — the vendor-side dedupe identifier
188    /// shared across every retry attempt of one logical call.
189    /// Operators that pre-allocate the key (e.g. for cross-process
190    /// dedupe through a sticky job id) call this before dispatch;
191    /// otherwise `RetryService` stamps a UUID on first entry.
192    #[must_use]
193    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
194        self.idempotency_key = Some(Arc::from(key.into()));
195        self
196    }
197
198    /// Mutable handle to the idempotency-key slot — `RetryService`
199    /// uses this to stamp a fresh key on first entry of a retry
200    /// loop (so attempts 2..N share the slot the first attempt set).
201    /// Outside the retry middleware, prefer
202    /// [`Self::with_idempotency_key`].
203    pub fn ensure_idempotency_key<F>(&mut self, generate: F) -> &str
204    where
205        F: FnOnce() -> String,
206    {
207        if self.idempotency_key.is_none() {
208            self.idempotency_key = Some(Arc::from(generate()));
209        }
210        // SAFETY: branch above guarantees Some.
211        self.idempotency_key.as_deref().unwrap_or("")
212    }
213
214    /// Attach an [`AuditSinkHandle`] for the current run. Sub-agents,
215    /// supervisors, and memory tools look it up via
216    /// [`Self::audit_sink`] and emit typed
217    /// [`crate::audit::AuditSink`] events; the absent handle makes
218    /// every emit site a no-op.
219    ///
220    /// Stored in [`Extensions`] under `AuditSinkHandle`'s `TypeId`,
221    /// so a second call replaces the prior handle (single sink per
222    /// run by design — recipes that need fan-out wrap two sinks
223    /// behind one impl).
224    #[must_use]
225    pub fn with_audit_sink(self, handle: AuditSinkHandle) -> Self {
226        self.add_extension(handle)
227    }
228
229    /// Borrow the [`AuditSinkHandle`] for the current run, if one
230    /// has been attached via [`Self::with_audit_sink`]. Emit sites
231    /// gate on `Some(_)` so a context without a sink incurs no
232    /// overhead beyond the `TypeId` lookup.
233    #[must_use]
234    pub fn audit_sink(&self) -> Option<Arc<AuditSinkHandle>> {
235        self.extension::<AuditSinkHandle>()
236    }
237
238    /// Attach a [`ToolProgressSinkHandle`] for the current run.
239    /// Long-running tools emit phase transitions through
240    /// [`Self::record_phase`] / [`Self::record_phase_with`] and the
241    /// SDK fans the transition into this sink. Absent handle makes
242    /// every emit site a silent no-op.
243    ///
244    /// Stored in [`Extensions`] under `ToolProgressSinkHandle`'s
245    /// `TypeId`, so a second call replaces the prior handle (single
246    /// sink per run by design — recipes that need fan-out wrap two
247    /// sinks behind one impl).
248    #[must_use]
249    pub fn with_tool_progress_sink(self, handle: ToolProgressSinkHandle) -> Self {
250        self.add_extension(handle)
251    }
252
253    /// Borrow the [`ToolProgressSinkHandle`] for the current run, if
254    /// one has been attached.
255    #[must_use]
256    pub fn tool_progress_sink(&self) -> Option<Arc<ToolProgressSinkHandle>> {
257        self.extension::<ToolProgressSinkHandle>()
258    }
259
260    /// Emit a tool-phase transition with no metadata.
261    ///
262    /// Silent no-op when no [`ToolProgressSinkHandle`] is attached or
263    /// when the call is outside a tool dispatch (no
264    /// [`CurrentToolInvocation`] marker on the context). See
265    /// [`Self::record_phase_with`] for the metadata-bearing variant.
266    /// Fire-and-forget — sink failures stay inside the sink and never
267    /// propagate to the tool body (invariant 4 + 18).
268    pub async fn record_phase(&self, phase: impl Into<String> + Send, status: ToolProgressStatus) {
269        self.record_phase_with(phase, status, serde_json::Value::Null)
270            .await;
271    }
272
273    /// Emit a tool-phase transition with structured metadata.
274    ///
275    /// `metadata` flows through to the sink alongside the phase
276    /// identity — UIs render percent-complete / item-counts /
277    /// partial-result counts from this field. Tools that have nothing
278    /// to attach pass [`Self::record_phase`] instead. Fire-and-forget —
279    /// sink failures stay inside the sink.
280    pub async fn record_phase_with(
281        &self,
282        phase: impl Into<String> + Send,
283        status: ToolProgressStatus,
284        metadata: serde_json::Value,
285    ) {
286        let Some(sink) = self.tool_progress_sink() else {
287            return;
288        };
289        let Some(current) = self.extension::<CurrentToolInvocation>() else {
290            return;
291        };
292        let progress = ToolProgress {
293            run_id: self.run_id().map(str::to_owned).unwrap_or_default(),
294            tool_use_id: current.tool_use_id().to_owned(),
295            tool_name: current.tool_name().to_owned(),
296            phase: phase.into(),
297            status,
298            dispatch_elapsed_ms: current.dispatch_elapsed_ms(),
299            metadata,
300        };
301        sink.inner().record_progress(progress).await;
302    }
303
304    /// Attach a typed cross-cutting value to the context's
305    /// [`Extensions`] slot. The returned context carries a fresh
306    /// `Extensions` Arc with `value` registered under `T`'s
307    /// `TypeId`; the caller's previous context is unchanged
308    /// (copy-on-write).
309    ///
310    /// One entry per type — calling this twice with the same `T`
311    /// replaces the earlier value. Operators threading multiple
312    /// values of the same logical category wrap them in a domain
313    /// type (`struct WorkspaceCtx { repo: ..., flags: ... }`) so
314    /// `T` stays the canonical key.
315    ///
316    /// Read back via [`Self::extension`] inside any tool, codec,
317    /// or runnable that sees the context.
318    ///
319    /// **Invariant 10**: do not stash credentials or bearer
320    /// tokens here — `ExecutionContext` flows into `Tool::execute`
321    /// and an extension carrying a token would surface it to
322    /// every tool the agent dispatches. Credentials live in
323    /// transports (`CredentialProvider`).
324    #[must_use]
325    pub fn add_extension<T>(mut self, value: T) -> Self
326    where
327        T: Send + Sync + 'static,
328    {
329        self.extensions = self.extensions.inserted(value);
330        self
331    }
332
333    /// Borrow the cancellation token. Long-running tools should periodically
334    /// check `is_cancelled()` and cooperatively shut down.
335    pub const fn cancellation(&self) -> &CancellationToken {
336        &self.cancellation
337    }
338
339    /// Returns the deadline if one was attached.
340    pub const fn deadline(&self) -> Option<Instant> {
341        self.deadline
342    }
343
344    /// Borrow the thread identifier if one was attached.
345    pub fn thread_id(&self) -> Option<&str> {
346        self.thread_id.as_deref()
347    }
348
349    /// Borrow the tenant identifier (invariant 11). Always present —
350    /// defaults to the process-shared [`TenantId::default()`] when
351    /// not explicitly set.
352    pub const fn tenant_id(&self) -> &TenantId {
353        &self.tenant_id
354    }
355
356    /// Borrow the per-execute correlation id, if one has been
357    /// stamped (typically by `Agent::execute` on entry, or by a
358    /// caller pre-allocating via [`Self::with_run_id`]).
359    pub fn run_id(&self) -> Option<&str> {
360        self.run_id.as_deref()
361    }
362
363    /// Borrow the parent run id, if this context represents a
364    /// sub-agent or tool-driven dispatch under a parent
365    /// `Agent::execute`. `None` at the root.
366    pub fn parent_run_id(&self) -> Option<&str> {
367        self.parent_run_id.as_deref()
368    }
369
370    /// Borrow the idempotency key for the current logical call, if
371    /// one has been stamped (by `RetryService` on first entry of a
372    /// retry loop, or by a caller pre-allocating via
373    /// [`Self::with_idempotency_key`]). `DirectTransport` and the
374    /// cloud transports forward this on the `Idempotency-Key`
375    /// request header so vendors dedupe retries server-side.
376    pub fn idempotency_key(&self) -> Option<&str> {
377        self.idempotency_key.as_deref()
378    }
379
380    /// Borrow the typed cross-cutting carrier. Use
381    /// [`Self::extension`] for the common single-type lookup;
382    /// reach for the carrier directly only when iterating over
383    /// cardinality or composing a downstream context that should
384    /// inherit the same set.
385    pub const fn extensions(&self) -> &Extensions {
386        &self.extensions
387    }
388
389    /// Look up the extension entry registered for `T`, returning
390    /// a refcounted handle independent of the context's lifetime.
391    /// Returns `None` when the entry is absent — operators reading
392    /// optional cross-cutting state code their own default.
393    #[must_use]
394    pub fn extension<T>(&self) -> Option<Arc<T>>
395    where
396        T: Send + Sync + 'static,
397    {
398        self.extensions.get::<T>()
399    }
400
401    /// Convenience: did the cancellation token fire?
402    pub fn is_cancelled(&self) -> bool {
403        self.cancellation.is_cancelled()
404    }
405
406    /// Derive a scoped child context. The child inherits `deadline`,
407    /// `thread_id`, and `tenant_id` but holds a *child* cancellation
408    /// token: cancelling the child does NOT cancel the parent, but
409    /// cancelling the parent cascades to the child.
410    ///
411    /// Use for scope-bounded fan-out (e.g. [`scatter`]) where a
412    /// fail-fast branch should signal still-running siblings to
413    /// abort cooperatively without tearing the whole request down.
414    ///
415    /// [`scatter`]: ../entelix_graph/fn.scatter.html
416    #[must_use]
417    pub fn child(&self) -> Self {
418        Self {
419            cancellation: self.cancellation.child_token(),
420            deadline: self.deadline,
421            thread_id: self.thread_id.clone(),
422            tenant_id: self.tenant_id.clone(),
423            run_id: self.run_id.clone(),
424            parent_run_id: self.parent_run_id.clone(),
425            idempotency_key: self.idempotency_key.clone(),
426            extensions: self.extensions.clone(),
427        }
428    }
429}
430
431#[cfg(test)]
432#[allow(clippy::unwrap_used)]
433mod extension_tests {
434    use super::*;
435
436    #[derive(Debug, PartialEq, Eq)]
437    struct WorkspaceCtx {
438        repo: &'static str,
439    }
440
441    #[test]
442    fn fresh_context_has_no_extensions() {
443        let ctx = ExecutionContext::new();
444        assert!(ctx.extensions().is_empty());
445        assert!(ctx.extension::<WorkspaceCtx>().is_none());
446    }
447
448    #[test]
449    fn add_extension_threads_typed_value() {
450        let ctx = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
451        let got = ctx.extension::<WorkspaceCtx>().unwrap();
452        assert_eq!(*got, WorkspaceCtx { repo: "entelix" });
453        assert_eq!(ctx.extensions().len(), 1);
454    }
455
456    #[test]
457    fn add_extension_is_copy_on_write() {
458        let original = ExecutionContext::new();
459        let extended = original
460            .clone()
461            .add_extension(WorkspaceCtx { repo: "entelix" });
462        // Original unchanged.
463        assert!(original.extension::<WorkspaceCtx>().is_none());
464        // Extended carries the value.
465        assert!(extended.extension::<WorkspaceCtx>().is_some());
466    }
467
468    #[test]
469    fn child_inherits_extensions() {
470        let parent = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
471        let child = parent.child();
472        let got = child.extension::<WorkspaceCtx>().unwrap();
473        assert_eq!(*got, WorkspaceCtx { repo: "entelix" });
474    }
475
476    #[test]
477    fn extension_arc_outlives_dropped_context() {
478        let ctx = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
479        let arc = ctx.extension::<WorkspaceCtx>().unwrap();
480        drop(ctx);
481        assert_eq!(*arc, WorkspaceCtx { repo: "entelix" });
482    }
483}