entelix_core/context.rs
1//! Request-scope execution context.
2//!
3//! `ExecutionContext` is the only context object that flows through
4//! `Runnable::invoke`, `Tool::execute`, hooks, and codecs. It carries:
5//!
6//! - a `CancellationToken` (F3 mitigation — cooperative cancellation)
7//! - an optional deadline (`tokio::time::Instant`)
8//! - an optional `thread_id` — the durable conversation identifier used
9//! by `Checkpointer` and `SessionGraph` to scope persistence.
10//! - a mandatory `tenant_id` — multi-tenant scope (invariant 11). Defaults
11//! to [`crate::DEFAULT_TENANT_ID`] when not specified explicitly.
12//!
13//! It deliberately does **not** embed a `CredentialProvider` (invariant 10 —
14//! tokens never reach Tool input). Future fields will be added as the layers
15//! materialize: span context, cost meter handle. Each addition is reviewed
16//! against the no-tokens-in-tools rule.
17//!
18//! `ExecutionContext` is `Clone` (cheap) so combinators can fan it out to
19//! parallel branches without lifetime gymnastics.
20
21use std::sync::Arc;
22
23use tokio::time::Instant;
24
25use crate::audit::AuditSinkHandle;
26use crate::cancellation::CancellationToken;
27use crate::extensions::Extensions;
28use crate::tenant_id::TenantId;
29use crate::tools::{
30 CurrentToolInvocation, ToolProgress, ToolProgressSinkHandle, ToolProgressStatus,
31};
32
33/// Carrier for request-scope state that every `Runnable`, `Tool`,
34/// and codec sees.
35///
36/// Marked `#[non_exhaustive]`: fields are private; callers always go
37/// through [`Self::new`] and the `with_*` builder methods, so adding
38/// a new field is a non-breaking change.
39///
40/// `tenant_id` is a [`TenantId`] (validating `Arc<str>` newtype, see
41/// [`crate::tenant_id`]). Cloning the context — done implicitly per
42/// tool dispatch and per sub-agent — bumps the underlying refcount
43/// instead of allocating. The default-tenant `TenantId` is shared
44/// process-wide via a `OnceLock`, so a freshly-built context
45/// allocates zero strings on the hot path.
46#[derive(Clone, Debug)]
47#[non_exhaustive]
48pub struct ExecutionContext {
49 cancellation: CancellationToken,
50 deadline: Option<Instant>,
51 thread_id: Option<String>,
52 tenant_id: TenantId,
53 run_id: Option<String>,
54 /// Run id of the parent that dispatched the call this context
55 /// represents. `None` at the root; `Some(parent_run)` for every
56 /// sub-agent and tool-driven dispatch the parent originates.
57 /// LangSmith-style trace-tree consumers reconstruct the
58 /// hierarchy from `(run_id, parent_run_id)` edges.
59 parent_run_id: Option<String>,
60 /// Idempotency key for the current logical call — vendor-side
61 /// dedupe identifier shared across every retry attempt of the
62 /// same logical call. `RetryService` stamps it on first entry
63 /// when absent so two retries of one timed-out request do not
64 /// produce two charges (invariant #17 — vendor-authoritative
65 /// guarantees beat self-jitter).
66 idempotency_key: Option<Arc<str>>,
67 extensions: Extensions,
68}
69
70impl Default for ExecutionContext {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl ExecutionContext {
77 /// Create a fresh context with a new cancellation token, no deadline,
78 /// no thread ID, and the [`TenantId::default()`] tenant scope
79 /// (which is the process-wide shared `"default"` `TenantId`).
80 pub fn new() -> Self {
81 Self {
82 cancellation: CancellationToken::new(),
83 deadline: None,
84 thread_id: None,
85 tenant_id: TenantId::default(),
86 run_id: None,
87 parent_run_id: None,
88 idempotency_key: None,
89 extensions: Extensions::new(),
90 }
91 }
92
93 /// Create a context bound to an existing cancellation token (e.g. a parent
94 /// agent's token, so cancelling the parent cascades to children).
95 pub fn with_cancellation(cancellation: CancellationToken) -> Self {
96 Self {
97 cancellation,
98 deadline: None,
99 thread_id: None,
100 tenant_id: TenantId::default(),
101 run_id: None,
102 parent_run_id: None,
103 idempotency_key: None,
104 extensions: Extensions::new(),
105 }
106 }
107
108 /// Attach a deadline. Returns `self` for builder-style chaining.
109 #[must_use]
110 pub const fn with_deadline(mut self, deadline: Instant) -> Self {
111 self.deadline = Some(deadline);
112 self
113 }
114
115 /// Attach a `thread_id` — the durable conversation key used by
116 /// `Checkpointer`s to scope persistence.
117 #[must_use]
118 pub fn with_thread_id(mut self, thread_id: impl Into<String>) -> Self {
119 self.thread_id = Some(thread_id.into());
120 self
121 }
122
123 /// Override the tenant scope. Multi-tenant operators set this per
124 /// request; single-tenant deployments leave it at
125 /// [`TenantId::default()`] (invariant 11).
126 #[must_use]
127 pub fn with_tenant_id(mut self, tenant_id: TenantId) -> Self {
128 self.tenant_id = tenant_id;
129 self
130 }
131
132 /// Attach a `run_id` — the per-execute correlation key the agent
133 /// runtime stamps on every `AgentEvent`, OTel span, and tool
134 /// invocation event. `Agent::execute` generates a fresh UUID v7
135 /// on entry when none is set; recipes pre-allocate it only when
136 /// they need to correlate the run with an external reservation
137 /// (a workflow id minted by the caller, a database row id chosen
138 /// before dispatch). Sub-agents do not inherit the parent's
139 /// `run_id` — they mint their own and the parent's flows
140 /// through [`Self::with_parent_run_id`] for trace-tree
141 /// reconstruction.
142 #[must_use]
143 pub fn with_run_id(mut self, run_id: impl Into<String>) -> Self {
144 self.run_id = Some(run_id.into());
145 self
146 }
147
148 /// Attach a `parent_run_id` — the run id of the calling agent
149 /// when this context represents a sub-agent dispatch. The
150 /// runtime sets this automatically when a parent `Agent::execute`
151 /// recurses into a child via [`crate::AgentContext`]; recipes
152 /// rarely call this directly. LangSmith-style trace-tree
153 /// consumers reconstruct hierarchy from `(run_id, parent_run_id)`
154 /// edges across `AgentEvent::Started`.
155 #[must_use]
156 pub fn with_parent_run_id(mut self, parent_run_id: impl Into<String>) -> Self {
157 self.parent_run_id = Some(parent_run_id.into());
158 self
159 }
160
161 /// Attach a [`crate::RunBudget`] — six-axis usage cap shared
162 /// across the run (parent agent + every sub-agent it
163 /// dispatches). Cloning the context bumps the budget's
164 /// internal `Arc` refcount so sub-agent dispatches accumulate
165 /// into the same counters.
166 ///
167 /// Stored in [`Extensions`] under `RunBudget`'s `TypeId`, so
168 /// a second call replaces the prior budget. Read sites
169 /// reach for it via [`Self::run_budget`]; absent budget
170 /// makes every dispatch site's check / observe a no-op.
171 #[must_use]
172 pub fn with_run_budget(self, budget: crate::RunBudget) -> Self {
173 self.add_extension(budget)
174 }
175
176 /// Borrow the [`crate::RunBudget`] attached via
177 /// [`Self::with_run_budget`], if any. Dispatch sites
178 /// (`ChatModel::complete_full` / `complete_typed` /
179 /// `stream_deltas`, `ToolRegistry::dispatch_*`) gate budget
180 /// checks on `Some(_)` so a context without a budget incurs
181 /// zero overhead beyond the `TypeId` lookup.
182 #[must_use]
183 pub fn run_budget(&self) -> Option<std::sync::Arc<crate::RunBudget>> {
184 self.extension::<crate::RunBudget>()
185 }
186
187 /// Attach an idempotency key — the vendor-side dedupe identifier
188 /// shared across every retry attempt of one logical call.
189 /// Operators that pre-allocate the key (e.g. for cross-process
190 /// dedupe through a sticky job id) call this before dispatch;
191 /// otherwise `RetryService` stamps a UUID on first entry.
192 #[must_use]
193 pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
194 self.idempotency_key = Some(Arc::from(key.into()));
195 self
196 }
197
198 /// Mutable handle to the idempotency-key slot — `RetryService`
199 /// uses this to stamp a fresh key on first entry of a retry
200 /// loop (so attempts 2..N share the slot the first attempt set).
201 /// Outside the retry middleware, prefer
202 /// [`Self::with_idempotency_key`].
203 pub fn ensure_idempotency_key<F>(&mut self, generate: F) -> &str
204 where
205 F: FnOnce() -> String,
206 {
207 if self.idempotency_key.is_none() {
208 self.idempotency_key = Some(Arc::from(generate()));
209 }
210 // SAFETY: branch above guarantees Some.
211 self.idempotency_key.as_deref().unwrap_or("")
212 }
213
214 /// Attach an [`AuditSinkHandle`] for the current run. Sub-agents,
215 /// supervisors, and memory tools look it up via
216 /// [`Self::audit_sink`] and emit typed
217 /// [`crate::audit::AuditSink`] events; the absent handle makes
218 /// every emit site a no-op.
219 ///
220 /// Stored in [`Extensions`] under `AuditSinkHandle`'s `TypeId`,
221 /// so a second call replaces the prior handle (single sink per
222 /// run by design — recipes that need fan-out wrap two sinks
223 /// behind one impl).
224 #[must_use]
225 pub fn with_audit_sink(self, handle: AuditSinkHandle) -> Self {
226 self.add_extension(handle)
227 }
228
229 /// Borrow the [`AuditSinkHandle`] for the current run, if one
230 /// has been attached via [`Self::with_audit_sink`]. Emit sites
231 /// gate on `Some(_)` so a context without a sink incurs no
232 /// overhead beyond the `TypeId` lookup.
233 #[must_use]
234 pub fn audit_sink(&self) -> Option<Arc<AuditSinkHandle>> {
235 self.extension::<AuditSinkHandle>()
236 }
237
238 /// Attach a [`ToolProgressSinkHandle`] for the current run.
239 /// Long-running tools emit phase transitions through
240 /// [`Self::record_phase`] / [`Self::record_phase_with`] and the
241 /// SDK fans the transition into this sink. Absent handle makes
242 /// every emit site a silent no-op.
243 ///
244 /// Stored in [`Extensions`] under `ToolProgressSinkHandle`'s
245 /// `TypeId`, so a second call replaces the prior handle (single
246 /// sink per run by design — recipes that need fan-out wrap two
247 /// sinks behind one impl).
248 #[must_use]
249 pub fn with_tool_progress_sink(self, handle: ToolProgressSinkHandle) -> Self {
250 self.add_extension(handle)
251 }
252
253 /// Borrow the [`ToolProgressSinkHandle`] for the current run, if
254 /// one has been attached.
255 #[must_use]
256 pub fn tool_progress_sink(&self) -> Option<Arc<ToolProgressSinkHandle>> {
257 self.extension::<ToolProgressSinkHandle>()
258 }
259
260 /// Emit a tool-phase transition with no metadata.
261 ///
262 /// Silent no-op when no [`ToolProgressSinkHandle`] is attached or
263 /// when the call is outside a tool dispatch (no
264 /// [`CurrentToolInvocation`] marker on the context). See
265 /// [`Self::record_phase_with`] for the metadata-bearing variant.
266 /// Fire-and-forget — sink failures stay inside the sink and never
267 /// propagate to the tool body (invariant 4 + 18).
268 pub async fn record_phase(&self, phase: impl Into<String> + Send, status: ToolProgressStatus) {
269 self.record_phase_with(phase, status, serde_json::Value::Null)
270 .await;
271 }
272
273 /// Emit a tool-phase transition with structured metadata.
274 ///
275 /// `metadata` flows through to the sink alongside the phase
276 /// identity — UIs render percent-complete / item-counts /
277 /// partial-result counts from this field. Tools that have nothing
278 /// to attach pass [`Self::record_phase`] instead. Fire-and-forget —
279 /// sink failures stay inside the sink.
280 pub async fn record_phase_with(
281 &self,
282 phase: impl Into<String> + Send,
283 status: ToolProgressStatus,
284 metadata: serde_json::Value,
285 ) {
286 let Some(sink) = self.tool_progress_sink() else {
287 return;
288 };
289 let Some(current) = self.extension::<CurrentToolInvocation>() else {
290 return;
291 };
292 let progress = ToolProgress {
293 run_id: self.run_id().map(str::to_owned).unwrap_or_default(),
294 tool_use_id: current.tool_use_id().to_owned(),
295 tool_name: current.tool_name().to_owned(),
296 phase: phase.into(),
297 status,
298 dispatch_elapsed_ms: current.dispatch_elapsed_ms(),
299 metadata,
300 };
301 sink.inner().record_progress(progress).await;
302 }
303
304 /// Attach a typed cross-cutting value to the context's
305 /// [`Extensions`] slot. The returned context carries a fresh
306 /// `Extensions` Arc with `value` registered under `T`'s
307 /// `TypeId`; the caller's previous context is unchanged
308 /// (copy-on-write).
309 ///
310 /// One entry per type — calling this twice with the same `T`
311 /// replaces the earlier value. Operators threading multiple
312 /// values of the same logical category wrap them in a domain
313 /// type (`struct WorkspaceCtx { repo: ..., flags: ... }`) so
314 /// `T` stays the canonical key.
315 ///
316 /// Read back via [`Self::extension`] inside any tool, codec,
317 /// or runnable that sees the context.
318 ///
319 /// **Invariant 10**: do not stash credentials or bearer
320 /// tokens here — `ExecutionContext` flows into `Tool::execute`
321 /// and an extension carrying a token would surface it to
322 /// every tool the agent dispatches. Credentials live in
323 /// transports (`CredentialProvider`).
324 #[must_use]
325 pub fn add_extension<T>(mut self, value: T) -> Self
326 where
327 T: Send + Sync + 'static,
328 {
329 self.extensions = self.extensions.inserted(value);
330 self
331 }
332
333 /// Borrow the cancellation token. Long-running tools should periodically
334 /// check `is_cancelled()` and cooperatively shut down.
335 pub const fn cancellation(&self) -> &CancellationToken {
336 &self.cancellation
337 }
338
339 /// Returns the deadline if one was attached.
340 pub const fn deadline(&self) -> Option<Instant> {
341 self.deadline
342 }
343
344 /// Borrow the thread identifier if one was attached.
345 pub fn thread_id(&self) -> Option<&str> {
346 self.thread_id.as_deref()
347 }
348
349 /// Borrow the tenant identifier (invariant 11). Always present —
350 /// defaults to the process-shared [`TenantId::default()`] when
351 /// not explicitly set.
352 pub const fn tenant_id(&self) -> &TenantId {
353 &self.tenant_id
354 }
355
356 /// Borrow the per-execute correlation id, if one has been
357 /// stamped (typically by `Agent::execute` on entry, or by a
358 /// caller pre-allocating via [`Self::with_run_id`]).
359 pub fn run_id(&self) -> Option<&str> {
360 self.run_id.as_deref()
361 }
362
363 /// Borrow the parent run id, if this context represents a
364 /// sub-agent or tool-driven dispatch under a parent
365 /// `Agent::execute`. `None` at the root.
366 pub fn parent_run_id(&self) -> Option<&str> {
367 self.parent_run_id.as_deref()
368 }
369
370 /// Borrow the idempotency key for the current logical call, if
371 /// one has been stamped (by `RetryService` on first entry of a
372 /// retry loop, or by a caller pre-allocating via
373 /// [`Self::with_idempotency_key`]). `DirectTransport` and the
374 /// cloud transports forward this on the `Idempotency-Key`
375 /// request header so vendors dedupe retries server-side.
376 pub fn idempotency_key(&self) -> Option<&str> {
377 self.idempotency_key.as_deref()
378 }
379
380 /// Borrow the typed cross-cutting carrier. Use
381 /// [`Self::extension`] for the common single-type lookup;
382 /// reach for the carrier directly only when iterating over
383 /// cardinality or composing a downstream context that should
384 /// inherit the same set.
385 pub const fn extensions(&self) -> &Extensions {
386 &self.extensions
387 }
388
389 /// Look up the extension entry registered for `T`, returning
390 /// a refcounted handle independent of the context's lifetime.
391 /// Returns `None` when the entry is absent — operators reading
392 /// optional cross-cutting state code their own default.
393 #[must_use]
394 pub fn extension<T>(&self) -> Option<Arc<T>>
395 where
396 T: Send + Sync + 'static,
397 {
398 self.extensions.get::<T>()
399 }
400
401 /// Convenience: did the cancellation token fire?
402 pub fn is_cancelled(&self) -> bool {
403 self.cancellation.is_cancelled()
404 }
405
406 /// Derive a scoped child context. The child inherits `deadline`,
407 /// `thread_id`, and `tenant_id` but holds a *child* cancellation
408 /// token: cancelling the child does NOT cancel the parent, but
409 /// cancelling the parent cascades to the child.
410 ///
411 /// Use for scope-bounded fan-out (e.g. [`scatter`]) where a
412 /// fail-fast branch should signal still-running siblings to
413 /// abort cooperatively without tearing the whole request down.
414 ///
415 /// [`scatter`]: ../entelix_graph/fn.scatter.html
416 #[must_use]
417 pub fn child(&self) -> Self {
418 Self {
419 cancellation: self.cancellation.child_token(),
420 deadline: self.deadline,
421 thread_id: self.thread_id.clone(),
422 tenant_id: self.tenant_id.clone(),
423 run_id: self.run_id.clone(),
424 parent_run_id: self.parent_run_id.clone(),
425 idempotency_key: self.idempotency_key.clone(),
426 extensions: self.extensions.clone(),
427 }
428 }
429}
430
431#[cfg(test)]
432#[allow(clippy::unwrap_used)]
433mod extension_tests {
434 use super::*;
435
436 #[derive(Debug, PartialEq, Eq)]
437 struct WorkspaceCtx {
438 repo: &'static str,
439 }
440
441 #[test]
442 fn fresh_context_has_no_extensions() {
443 let ctx = ExecutionContext::new();
444 assert!(ctx.extensions().is_empty());
445 assert!(ctx.extension::<WorkspaceCtx>().is_none());
446 }
447
448 #[test]
449 fn add_extension_threads_typed_value() {
450 let ctx = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
451 let got = ctx.extension::<WorkspaceCtx>().unwrap();
452 assert_eq!(*got, WorkspaceCtx { repo: "entelix" });
453 assert_eq!(ctx.extensions().len(), 1);
454 }
455
456 #[test]
457 fn add_extension_is_copy_on_write() {
458 let original = ExecutionContext::new();
459 let extended = original
460 .clone()
461 .add_extension(WorkspaceCtx { repo: "entelix" });
462 // Original unchanged.
463 assert!(original.extension::<WorkspaceCtx>().is_none());
464 // Extended carries the value.
465 assert!(extended.extension::<WorkspaceCtx>().is_some());
466 }
467
468 #[test]
469 fn child_inherits_extensions() {
470 let parent = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
471 let child = parent.child();
472 let got = child.extension::<WorkspaceCtx>().unwrap();
473 assert_eq!(*got, WorkspaceCtx { repo: "entelix" });
474 }
475
476 #[test]
477 fn extension_arc_outlives_dropped_context() {
478 let ctx = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
479 let arc = ctx.extension::<WorkspaceCtx>().unwrap();
480 drop(ctx);
481 assert_eq!(*arc, WorkspaceCtx { repo: "entelix" });
482 }
483}