entelix-core 0.5.4

entelix DAG root — IR, codecs, transports, Tool trait + ToolRegistry, auth, ExecutionContext, ModelInvocation/ToolInvocation Service spine, StreamAggregator
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
//! Request-scope execution context.
//!
//! `ExecutionContext` is the only context object that flows through
//! `Runnable::invoke`, `Tool::execute`, hooks, and codecs. It carries:
//!
//! - a `CancellationToken` (F3 mitigation — cooperative cancellation)
//! - an optional deadline (`tokio::time::Instant`)
//! - an optional `thread_id` — the durable conversation identifier used
//!   by `Checkpointer` and `SessionGraph` to scope persistence.
//! - a mandatory `tenant_id` — multi-tenant scope (invariant 11). Defaults
//!   to [`crate::DEFAULT_TENANT_ID`] when not specified explicitly.
//!
//! It deliberately does **not** embed a `CredentialProvider` (invariant 10 —
//! tokens never reach Tool input). Future fields will be added as the layers
//! materialize: span context, cost meter handle. Each addition is reviewed
//! against the no-tokens-in-tools rule.
//!
//! `ExecutionContext` is `Clone` (cheap) so combinators can fan it out to
//! parallel branches without lifetime gymnastics.

use std::sync::Arc;

use tokio::time::Instant;

use crate::audit::AuditSinkHandle;
use crate::cancellation::CancellationToken;
use crate::extensions::Extensions;
use crate::tenant_id::TenantId;
use crate::tools::{
    CurrentToolInvocation, ToolProgress, ToolProgressSinkHandle, ToolProgressStatus,
};

/// Carrier for request-scope state that every `Runnable`, `Tool`,
/// and codec sees.
///
/// Marked `#[non_exhaustive]`: fields are private; callers always go
/// through [`Self::new`] and the `with_*` builder methods, so adding
/// a new field is a non-breaking change.
///
/// `tenant_id` is a [`TenantId`] (validating `Arc<str>` newtype, see
/// [`crate::tenant_id`]). Cloning the context — done implicitly per
/// tool dispatch and per sub-agent — bumps the underlying refcount
/// instead of allocating. The default-tenant `TenantId` is shared
/// process-wide via a `OnceLock`, so a freshly-built context
/// allocates zero strings on the hot path.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ExecutionContext {
    cancellation: CancellationToken,
    deadline: Option<Instant>,
    thread_id: Option<String>,
    tenant_id: TenantId,
    run_id: Option<String>,
    /// Run id of the parent that dispatched the call this context
    /// represents. `None` at the root; `Some(parent_run)` for every
    /// sub-agent and tool-driven dispatch the parent originates.
    /// LangSmith-style trace-tree consumers reconstruct the
    /// hierarchy from `(run_id, parent_run_id)` edges.
    parent_run_id: Option<String>,
    /// Idempotency key for the current logical call — vendor-side
    /// dedupe identifier shared across every retry attempt of the
    /// same logical call. `RetryService` stamps it on first entry
    /// when absent so two retries of one timed-out request do not
    /// produce two charges (invariant #17 — vendor-authoritative
    /// guarantees beat self-jitter).
    idempotency_key: Option<Arc<str>>,
    extensions: Extensions,
}

impl Default for ExecutionContext {
    fn default() -> Self {
        Self::new()
    }
}

impl ExecutionContext {
    /// Create a fresh context with a new cancellation token, no deadline,
    /// no thread ID, and the [`TenantId::default()`] tenant scope
    /// (which is the process-wide shared `"default"` `TenantId`).
    pub fn new() -> Self {
        Self {
            cancellation: CancellationToken::new(),
            deadline: None,
            thread_id: None,
            tenant_id: TenantId::default(),
            run_id: None,
            parent_run_id: None,
            idempotency_key: None,
            extensions: Extensions::new(),
        }
    }

    /// Create a context bound to an existing cancellation token (e.g. a parent
    /// agent's token, so cancelling the parent cascades to children).
    pub fn with_cancellation(cancellation: CancellationToken) -> Self {
        Self {
            cancellation,
            deadline: None,
            thread_id: None,
            tenant_id: TenantId::default(),
            run_id: None,
            parent_run_id: None,
            idempotency_key: None,
            extensions: Extensions::new(),
        }
    }

    /// Attach a deadline. Returns `self` for builder-style chaining.
    #[must_use]
    pub const fn with_deadline(mut self, deadline: Instant) -> Self {
        self.deadline = Some(deadline);
        self
    }

    /// Attach a `thread_id` — the durable conversation key used by
    /// `Checkpointer`s to scope persistence.
    #[must_use]
    pub fn with_thread_id(mut self, thread_id: impl Into<String>) -> Self {
        self.thread_id = Some(thread_id.into());
        self
    }

    /// Override the tenant scope. Multi-tenant operators set this per
    /// request; single-tenant deployments leave it at
    /// [`TenantId::default()`] (invariant 11).
    #[must_use]
    pub fn with_tenant_id(mut self, tenant_id: TenantId) -> Self {
        self.tenant_id = tenant_id;
        self
    }

    /// Attach a `run_id` — the per-execute correlation key the agent
    /// runtime stamps on every `AgentEvent`, OTel span, and tool
    /// invocation event. `Agent::execute` generates a fresh UUID v7
    /// on entry when none is set; recipes pre-allocate it only when
    /// they need to correlate the run with an external reservation
    /// (a workflow id minted by the caller, a database row id chosen
    /// before dispatch). Sub-agents do not inherit the parent's
    /// `run_id` — they mint their own and the parent's flows
    /// through [`Self::with_parent_run_id`] for trace-tree
    /// reconstruction.
    #[must_use]
    pub fn with_run_id(mut self, run_id: impl Into<String>) -> Self {
        self.run_id = Some(run_id.into());
        self
    }

    /// Attach a `parent_run_id` — the run id of the calling agent
    /// when this context represents a sub-agent dispatch. The
    /// runtime sets this automatically when a parent `Agent::execute`
    /// recurses into a child via [`crate::AgentContext`]; recipes
    /// rarely call this directly. LangSmith-style trace-tree
    /// consumers reconstruct hierarchy from `(run_id, parent_run_id)`
    /// edges across `AgentEvent::Started`.
    #[must_use]
    pub fn with_parent_run_id(mut self, parent_run_id: impl Into<String>) -> Self {
        self.parent_run_id = Some(parent_run_id.into());
        self
    }

    /// Attach a [`crate::RunBudget`] — six-axis usage cap shared
    /// across the run (parent agent + every sub-agent it
    /// dispatches). Cloning the context bumps the budget's
    /// internal `Arc` refcount so sub-agent dispatches accumulate
    /// into the same counters.
    ///
    /// Stored in [`Extensions`] under `RunBudget`'s `TypeId`, so
    /// a second call replaces the prior budget. Read sites
    /// reach for it via [`Self::run_budget`]; absent budget
    /// makes every dispatch site's check / observe a no-op.
    #[must_use]
    pub fn with_run_budget(self, budget: crate::RunBudget) -> Self {
        self.add_extension(budget)
    }

    /// Borrow the [`crate::RunBudget`] attached via
    /// [`Self::with_run_budget`], if any. Dispatch sites
    /// (`ChatModel::complete_full` / `complete_typed` /
    /// `stream_deltas`, `ToolRegistry::dispatch_*`) gate budget
    /// checks on `Some(_)` so a context without a budget incurs
    /// zero overhead beyond the `TypeId` lookup.
    #[must_use]
    pub fn run_budget(&self) -> Option<std::sync::Arc<crate::RunBudget>> {
        self.extension::<crate::RunBudget>()
    }

    /// Attach an idempotency key — the vendor-side dedupe identifier
    /// shared across every retry attempt of one logical call.
    /// Operators that pre-allocate the key (e.g. for cross-process
    /// dedupe through a sticky job id) call this before dispatch;
    /// otherwise `RetryService` stamps a UUID on first entry.
    #[must_use]
    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
        self.idempotency_key = Some(Arc::from(key.into()));
        self
    }

    /// Mutable handle to the idempotency-key slot — `RetryService`
    /// uses this to stamp a fresh key on first entry of a retry
    /// loop (so attempts 2..N share the slot the first attempt set).
    /// Outside the retry middleware, prefer
    /// [`Self::with_idempotency_key`].
    pub fn ensure_idempotency_key<F>(&mut self, generate: F) -> &str
    where
        F: FnOnce() -> String,
    {
        if self.idempotency_key.is_none() {
            self.idempotency_key = Some(Arc::from(generate()));
        }
        // SAFETY: branch above guarantees Some.
        self.idempotency_key.as_deref().unwrap_or("")
    }

    /// Attach an [`AuditSinkHandle`] for the current run. Sub-agents,
    /// supervisors, and memory tools look it up via
    /// [`Self::audit_sink`] and emit typed
    /// [`crate::audit::AuditSink`] events; the absent handle makes
    /// every emit site a no-op.
    ///
    /// Stored in [`Extensions`] under `AuditSinkHandle`'s `TypeId`,
    /// so a second call replaces the prior handle (single sink per
    /// run by design — recipes that need fan-out wrap two sinks
    /// behind one impl).
    #[must_use]
    pub fn with_audit_sink(self, handle: AuditSinkHandle) -> Self {
        self.add_extension(handle)
    }

    /// Borrow the [`AuditSinkHandle`] for the current run, if one
    /// has been attached via [`Self::with_audit_sink`]. Emit sites
    /// gate on `Some(_)` so a context without a sink incurs no
    /// overhead beyond the `TypeId` lookup.
    #[must_use]
    pub fn audit_sink(&self) -> Option<Arc<AuditSinkHandle>> {
        self.extension::<AuditSinkHandle>()
    }

    /// Attach a [`ToolProgressSinkHandle`] for the current run.
    /// Long-running tools emit phase transitions through
    /// [`Self::record_phase`] / [`Self::record_phase_with`] and the
    /// SDK fans the transition into this sink. Absent handle makes
    /// every emit site a silent no-op.
    ///
    /// Stored in [`Extensions`] under `ToolProgressSinkHandle`'s
    /// `TypeId`, so a second call replaces the prior handle (single
    /// sink per run by design — recipes that need fan-out wrap two
    /// sinks behind one impl).
    #[must_use]
    pub fn with_tool_progress_sink(self, handle: ToolProgressSinkHandle) -> Self {
        self.add_extension(handle)
    }

    /// Borrow the [`ToolProgressSinkHandle`] for the current run, if
    /// one has been attached.
    #[must_use]
    pub fn tool_progress_sink(&self) -> Option<Arc<ToolProgressSinkHandle>> {
        self.extension::<ToolProgressSinkHandle>()
    }

    /// Emit a tool-phase transition with no metadata.
    ///
    /// Silent no-op when no [`ToolProgressSinkHandle`] is attached or
    /// when the call is outside a tool dispatch (no
    /// [`CurrentToolInvocation`] marker on the context). See
    /// [`Self::record_phase_with`] for the metadata-bearing variant.
    /// Fire-and-forget — sink failures stay inside the sink and never
    /// propagate to the tool body (invariant 4 + 18).
    pub async fn record_phase(&self, phase: impl Into<String> + Send, status: ToolProgressStatus) {
        self.record_phase_with(phase, status, serde_json::Value::Null)
            .await;
    }

    /// Emit a tool-phase transition with structured metadata.
    ///
    /// `metadata` flows through to the sink alongside the phase
    /// identity — UIs render percent-complete / item-counts /
    /// partial-result counts from this field. Tools that have nothing
    /// to attach pass [`Self::record_phase`] instead. Fire-and-forget —
    /// sink failures stay inside the sink.
    pub async fn record_phase_with(
        &self,
        phase: impl Into<String> + Send,
        status: ToolProgressStatus,
        metadata: serde_json::Value,
    ) {
        let Some(sink) = self.tool_progress_sink() else {
            return;
        };
        let Some(current) = self.extension::<CurrentToolInvocation>() else {
            return;
        };
        let progress = ToolProgress {
            run_id: self.run_id().map(str::to_owned).unwrap_or_default(),
            tool_use_id: current.tool_use_id().to_owned(),
            tool_name: current.tool_name().to_owned(),
            phase: phase.into(),
            status,
            dispatch_elapsed_ms: current.dispatch_elapsed_ms(),
            metadata,
        };
        sink.inner().record_progress(progress).await;
    }

    /// Attach a typed cross-cutting value to the context's
    /// [`Extensions`] slot. The returned context carries a fresh
    /// `Extensions` Arc with `value` registered under `T`'s
    /// `TypeId`; the caller's previous context is unchanged
    /// (copy-on-write).
    ///
    /// One entry per type — calling this twice with the same `T`
    /// replaces the earlier value. Operators threading multiple
    /// values of the same logical category wrap them in a domain
    /// type (`struct WorkspaceCtx { repo: ..., flags: ... }`) so
    /// `T` stays the canonical key.
    ///
    /// Read back via [`Self::extension`] inside any tool, codec,
    /// or runnable that sees the context.
    ///
    /// **Invariant 10**: do not stash credentials or bearer
    /// tokens here — `ExecutionContext` flows into `Tool::execute`
    /// and an extension carrying a token would surface it to
    /// every tool the agent dispatches. Credentials live in
    /// transports (`CredentialProvider`).
    #[must_use]
    pub fn add_extension<T>(mut self, value: T) -> Self
    where
        T: Send + Sync + 'static,
    {
        self.extensions = self.extensions.inserted(value);
        self
    }

    /// Borrow the cancellation token. Long-running tools should periodically
    /// check `is_cancelled()` and cooperatively shut down.
    pub const fn cancellation(&self) -> &CancellationToken {
        &self.cancellation
    }

    /// Returns the deadline if one was attached.
    pub const fn deadline(&self) -> Option<Instant> {
        self.deadline
    }

    /// Borrow the thread identifier if one was attached.
    pub fn thread_id(&self) -> Option<&str> {
        self.thread_id.as_deref()
    }

    /// Borrow the tenant identifier (invariant 11). Always present —
    /// defaults to the process-shared [`TenantId::default()`] when
    /// not explicitly set.
    pub const fn tenant_id(&self) -> &TenantId {
        &self.tenant_id
    }

    /// Borrow the per-execute correlation id, if one has been
    /// stamped (typically by `Agent::execute` on entry, or by a
    /// caller pre-allocating via [`Self::with_run_id`]).
    pub fn run_id(&self) -> Option<&str> {
        self.run_id.as_deref()
    }

    /// Borrow the parent run id, if this context represents a
    /// sub-agent or tool-driven dispatch under a parent
    /// `Agent::execute`. `None` at the root.
    pub fn parent_run_id(&self) -> Option<&str> {
        self.parent_run_id.as_deref()
    }

    /// Borrow the idempotency key for the current logical call, if
    /// one has been stamped (by `RetryService` on first entry of a
    /// retry loop, or by a caller pre-allocating via
    /// [`Self::with_idempotency_key`]). `DirectTransport` and the
    /// cloud transports forward this on the `Idempotency-Key`
    /// request header so vendors dedupe retries server-side.
    pub fn idempotency_key(&self) -> Option<&str> {
        self.idempotency_key.as_deref()
    }

    /// Borrow the typed cross-cutting carrier. Use
    /// [`Self::extension`] for the common single-type lookup;
    /// reach for the carrier directly only when iterating over
    /// cardinality or composing a downstream context that should
    /// inherit the same set.
    pub const fn extensions(&self) -> &Extensions {
        &self.extensions
    }

    /// Look up the extension entry registered for `T`, returning
    /// a refcounted handle independent of the context's lifetime.
    /// Returns `None` when the entry is absent — operators reading
    /// optional cross-cutting state code their own default.
    #[must_use]
    pub fn extension<T>(&self) -> Option<Arc<T>>
    where
        T: Send + Sync + 'static,
    {
        self.extensions.get::<T>()
    }

    /// Convenience: did the cancellation token fire?
    pub fn is_cancelled(&self) -> bool {
        self.cancellation.is_cancelled()
    }

    /// Derive a scoped child context. The child inherits `deadline`,
    /// `thread_id`, and `tenant_id` but holds a *child* cancellation
    /// token: cancelling the child does NOT cancel the parent, but
    /// cancelling the parent cascades to the child.
    ///
    /// Use for scope-bounded fan-out (e.g. [`scatter`]) where a
    /// fail-fast branch should signal still-running siblings to
    /// abort cooperatively without tearing the whole request down.
    ///
    /// [`scatter`]: ../entelix_graph/fn.scatter.html
    #[must_use]
    pub fn child(&self) -> Self {
        Self {
            cancellation: self.cancellation.child_token(),
            deadline: self.deadline,
            thread_id: self.thread_id.clone(),
            tenant_id: self.tenant_id.clone(),
            run_id: self.run_id.clone(),
            parent_run_id: self.parent_run_id.clone(),
            idempotency_key: self.idempotency_key.clone(),
            extensions: self.extensions.clone(),
        }
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod extension_tests {
    use super::*;

    #[derive(Debug, PartialEq, Eq)]
    struct WorkspaceCtx {
        repo: &'static str,
    }

    #[test]
    fn fresh_context_has_no_extensions() {
        let ctx = ExecutionContext::new();
        assert!(ctx.extensions().is_empty());
        assert!(ctx.extension::<WorkspaceCtx>().is_none());
    }

    #[test]
    fn add_extension_threads_typed_value() {
        let ctx = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
        let got = ctx.extension::<WorkspaceCtx>().unwrap();
        assert_eq!(*got, WorkspaceCtx { repo: "entelix" });
        assert_eq!(ctx.extensions().len(), 1);
    }

    #[test]
    fn add_extension_is_copy_on_write() {
        let original = ExecutionContext::new();
        let extended = original
            .clone()
            .add_extension(WorkspaceCtx { repo: "entelix" });
        // Original unchanged.
        assert!(original.extension::<WorkspaceCtx>().is_none());
        // Extended carries the value.
        assert!(extended.extension::<WorkspaceCtx>().is_some());
    }

    #[test]
    fn child_inherits_extensions() {
        let parent = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
        let child = parent.child();
        let got = child.extension::<WorkspaceCtx>().unwrap();
        assert_eq!(*got, WorkspaceCtx { repo: "entelix" });
    }

    #[test]
    fn extension_arc_outlives_dropped_context() {
        let ctx = ExecutionContext::new().add_extension(WorkspaceCtx { repo: "entelix" });
        let arc = ctx.extension::<WorkspaceCtx>().unwrap();
        drop(ctx);
        assert_eq!(*arc, WorkspaceCtx { repo: "entelix" });
    }
}