Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod clock;
5mod config_ops;
6mod effect;
7mod environment;
8mod error;
9mod host;
10mod in_memory_store;
11mod io;
12mod lifecycle;
13mod observation;
14mod process;
15mod process_work_driver;
16mod process_worker;
17mod queued_work_driver;
18pub mod scenario_contracts;
19mod session_api;
20mod session_execution_lease;
21mod session_manager;
22mod session_ops;
23mod state;
24#[cfg(test)]
25pub(crate) mod tests;
26mod turn_boundary;
27mod turn_commit_draft;
28mod turn_driver;
29mod turn_graph_editor;
30mod turn_input_ingress;
31mod turn_loop;
32mod turn_queue;
33mod usage;
34
35use std::any::Any;
36use std::collections::HashMap;
37use std::fmt;
38use std::sync::Arc;
39use std::sync::Mutex as StdMutex;
40use std::sync::atomic::{AtomicBool, Ordering};
41
42use tokio::sync::{Mutex, mpsc};
43use tokio_util::sync::CancellationToken;
44
45use crate::llm::types::{
46    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
47    LlmStreamEvent, LlmUsage,
48};
49use crate::plugin::{
50    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
51};
52use crate::sansio::{LlmCallError, Response};
53use crate::session_model::{
54    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
55    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
56    transport_stream_events,
57};
58use crate::{
59    CheckpointKind, PersistentRuntimeServices, PluginOperationInvokeError, PromptHookContext,
60    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
61    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
62};
63use crate::{Effect, TurnMachine};
64
65use host::*;
66use session_execution_lease::*;
67use session_manager::*;
68use turn_boundary::*;
69use turn_commit_draft::*;
70use turn_driver::*;
71
72pub(super) fn runtime_error_from_store_commit(err: crate::store::StoreError) -> RuntimeError {
73    match err {
74        crate::store::StoreError::SessionExecutionLeaseExpired { session_id } => RuntimeError::new(
75            RuntimeErrorCode::SessionExecutionLeaseLost,
76            format!("session execution lease for session `{session_id}` was lost before commit"),
77        ),
78        err => RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()),
79    }
80}
81
82// `PromptUsage` is re-exported below alongside the runtime's own types.
83pub use lash_sansio::PromptUsage;
84
85use assembly::{
86    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
87    LlmStreamState, LlmStreamSummary, TurnAssembler,
88};
89#[cfg(test)]
90#[allow(unused_imports)]
91use assembly::{classify_output_state, sanitize_assistant_output};
92pub use builder::EmbeddedRuntimeBuilder;
93pub use causal::process_event_invocation;
94pub(crate) use causal::tool_retry_sleep_invocation;
95pub use clock::{Clock, SystemClock};
96pub(crate) use effect::RuntimeEffectControllerHandle;
97pub use effect::{
98    AwaitEventKey, AwaitEventResolver, AwaitEventWaitIdentity, CausalRef, EffectHost,
99    ExecutionScope, ExternalCompletionError, InlineEffectHost, InlineRuntimeEffectController,
100    LlmAttachmentSpec, LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, Resolution,
101    ResolveOutcome, RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
102    RuntimeEffectEnvelope, RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome,
103    RuntimeInvocation, RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
104    ToolAttemptEffectOutcome, ToolAttemptLaunch, ToolBatchEffectOutcome, ToolCallLaunch,
105};
106pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
107pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
108pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
109pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
110use io::normalize_input_items;
111pub use observation::{
112    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
113    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
114    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
115    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
116    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
117    SessionRevision,
118};
119#[cfg(any(test, feature = "testing"))]
120pub use process::TestLocalProcessRegistry;
121pub use process::{
122    AbandonEvidence, AbandonRequest, AbandonWriter, DefaultProcessCancelAbility,
123    InMemoryProcessExecutionEnvStore, ObservedProcess, ObservedProcessEvent, ObservedWorkItem,
124    PROCESS_LEASE_SCHEMA_VERSION, ProcessAttach, ProcessAwaitOutput, ProcessAwaiter,
125    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
126    ProcessCancelSummary, ProcessChangeHub, ProcessEngine, ProcessEngineRegistry,
127    ProcessEngineRunContext, ProcessEngineRunGuard, ProcessEngineRuntimeContext,
128    ProcessEngineValidationContext, ProcessEvent, ProcessEventAppendPlan,
129    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessEventSemantics,
130    ProcessEventSemanticsSpec, ProcessEventSink, ProcessEventType, ProcessExecutionContext,
131    ProcessExecutionEnvRef, ProcessExecutionEnvSpec, ProcessExecutionEnvStore, ProcessExternalRef,
132    ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry, ProcessHandleSummary,
133    ProcessId, ProcessIdentity, ProcessInput, ProcessLease, ProcessLeaseClaimOutcome,
134    ProcessLeaseCompletion, ProcessLifecycleStatus, ProcessListFilter, ProcessListMode,
135    ProcessOpScope, ProcessOriginator, ProcessProvenance, ProcessPruneReport, ProcessRecord,
136    ProcessRegistration, ProcessRegistry, ProcessService, ProcessSessionDeleteReport,
137    ProcessSpawnProvenance, ProcessStartGrant, ProcessStartOptions, ProcessStartRequest,
138    ProcessStarted, ProcessStatus, ProcessStatusFilter, ProcessTerminalSemantics,
139    ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector, ProcessWake,
140    ProcessWakeDedupeKey, ProcessWakeDelivery, ProcessWakeDeliveryRequest, ProcessWakeSpec,
141    ProcessWorkObserver, ProcessWorkSnapshot, RecoveryDisposition, SessionScope, SessionScopeId,
142    UnavailableProcessService, WaitKind, WaitState, apply_process_status_projection,
143    current_epoch_ms, epoch_ms_from_system_time, load_process_execution_env,
144    materialize_process_event_semantics, persist_process_execution_env,
145    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
146    process_signal_event_type, process_signal_name_from_event_type, process_signal_wait_key,
147    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
148    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
149    validate_process_signal_name, watch_process_registry, watch_process_registry_with_sink,
150};
151pub use process_work_driver::{InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver};
152pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig, ProcessDrainReport};
153pub use queued_work_driver::{QueuedWorkDriver, QueuedWorkRunHandle, QueuedWorkRunRequest};
154pub use scenario_contracts::{RUNTIME_SCENARIO_CONTRACTS, ScenarioContractSpec};
155pub use session_manager::DirectCompletionClient;
156pub use state::RuntimeSessionState;
157use state::{
158    append_session_nodes_to_state_with_clock, apply_residency_on_load, apply_session_checkpoint,
159    apply_session_head, normalize_session_graph, open_agent_frame_in_state_with_clock,
160};
161pub use turn_input_ingress::{
162    PendingTurnInput, PendingTurnInputCancelOutcome, PendingTurnInputCancelResult,
163    PendingTurnInputCancelTarget, PendingTurnInputClaimDiagnostics, PendingTurnInputDraft,
164    PendingTurnInputSuffixCancelOutcome, QueuedCheckpointTurnInput, TurnInputCheckpointBoundary,
165    TurnInputClaim, TurnInputClaimMode, TurnInputCompletion, TurnInputIngress, TurnInputState,
166};
167pub use turn_loop::ensure_durable_effect_input;
168pub use turn_queue::{
169    DeliveryPolicy, MergeKey, QueuedCheckpointWork, QueuedTurnWork, QueuedWorkBatch,
170    QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary, QueuedWorkClass,
171    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand, SessionCommandReceipt,
172    SlotPolicy, process_wake_batch_draft,
173};
174pub use usage::{
175    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
176    diff_usage_reports,
177};
178use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
179
180#[doc(hidden)]
181#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
182pub enum RuntimeTurnPhase {
183    ContextTransform,
184    BeforeTurnHooks,
185    PromptBuild,
186    EffectLoop,
187    FinalizeTurn,
188    PersistTurn,
189    FinalCommit,
190    PostPersistHooks,
191}
192
193#[doc(hidden)]
194pub trait RuntimeTurnPhaseProbe: Send + Sync {
195    fn begin(&self, phase: RuntimeTurnPhase);
196    fn end(&self, phase: RuntimeTurnPhase);
197    fn begin_named(&self, _phase: &str) {}
198    fn end_named(&self, _phase: &str) {}
199}
200
201#[doc(hidden)]
202#[derive(Clone, Default)]
203pub struct RuntimeTurnPhaseProbeSlot {
204    probes: Arc<StdMutex<HashMap<crate::SessionScopeId, Arc<dyn RuntimeTurnPhaseProbe>>>>,
205}
206
207impl RuntimeTurnPhaseProbeSlot {
208    pub fn set_for_session(
209        &self,
210        session_id: impl Into<String>,
211        probe: Arc<dyn RuntimeTurnPhaseProbe>,
212    ) {
213        self.set_for_scope(&crate::SessionScope::new(session_id), probe);
214    }
215
216    pub fn set_for_scope(
217        &self,
218        scope: &crate::SessionScope,
219        probe: Arc<dyn RuntimeTurnPhaseProbe>,
220    ) {
221        self.probes
222            .lock()
223            .expect("runtime phase probe slot")
224            .insert(scope.id(), probe);
225    }
226
227    pub fn get_for_scope(
228        &self,
229        scope: &crate::SessionScope,
230    ) -> Option<Arc<dyn RuntimeTurnPhaseProbe>> {
231        let probes = self.probes.lock().expect("runtime phase probe slot");
232        probes.get(&scope.id()).cloned().or_else(|| {
233            probes
234                .get(&crate::SessionScope::new(&scope.session_id).id())
235                .cloned()
236        })
237    }
238}
239
240#[doc(hidden)]
241pub struct RuntimeNamedPhase {
242    probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
243    phase: &'static str,
244}
245
246impl RuntimeNamedPhase {
247    pub fn begin(
248        probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
249        phase: &'static str,
250    ) -> RuntimeNamedPhase {
251        if let Some(probe) = probe.as_ref() {
252            probe.begin_named(phase);
253        }
254        RuntimeNamedPhase { probe, phase }
255    }
256}
257
258impl Drop for RuntimeNamedPhase {
259    fn drop(&mut self) {
260        if let Some(probe) = self.probe.as_ref() {
261            probe.end_named(self.phase);
262        }
263    }
264}
265
266/// Host-provided per-turn input.
267#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
268#[serde(tag = "type", rename_all = "snake_case")]
269pub enum InputItem {
270    Text { text: String },
271    ImageRef { id: String },
272}
273
274impl InputItem {
275    pub fn text(text: impl Into<String>) -> Self {
276        Self::Text { text: text.into() }
277    }
278
279    pub fn image_ref(id: impl Into<String>) -> Self {
280        Self::ImageRef { id: id.into() }
281    }
282}
283
284/// Host-provided per-turn input.
285#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
286pub struct TurnInput {
287    pub items: Vec<InputItem>,
288    #[serde(default)]
289    pub image_blobs: HashMap<String, Vec<u8>>,
290    /// Per-turn override for protocol-owned turn options.
291    #[serde(default, skip_serializing_if = "Option::is_none")]
292    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
293    /// Optional externally-stable trace turn id. Normal runtime callers leave
294    /// this empty and the runtime generates one per outer turn.
295    #[serde(default, skip_serializing_if = "Option::is_none")]
296    pub trace_turn_id: Option<String>,
297    #[serde(skip)]
298    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
299    #[serde(skip)]
300    pub turn_context: TurnContext,
301}
302
303impl TurnInput {
304    pub fn empty() -> Self {
305        Self::items(std::iter::empty())
306    }
307
308    pub fn text(text: impl Into<String>) -> Self {
309        Self::items([InputItem::text(text)])
310    }
311
312    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
313        Self {
314            items: items.into_iter().collect(),
315            image_blobs: HashMap::new(),
316            protocol_turn_options: None,
317            trace_turn_id: None,
318            protocol_extension: None,
319            turn_context: TurnContext::default(),
320        }
321    }
322
323    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
324        self.image_blobs.insert(id.into(), bytes);
325        self
326    }
327
328    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
329    where
330        I: IntoIterator<Item = (K, Vec<u8>)>,
331        K: Into<String>,
332    {
333        self.image_blobs.extend(
334            image_blobs
335                .into_iter()
336                .map(|(id, bytes)| (id.into(), bytes)),
337        );
338        self
339    }
340
341    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
342        let id = id.into();
343        self.items.push(InputItem::image_ref(id.clone()));
344        self.image_blobs.insert(id, bytes);
345        self
346    }
347
348    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
349        self.protocol_turn_options = Some(options);
350        self
351    }
352
353    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
354        self.trace_turn_id = Some(trace_turn_id.into());
355        self
356    }
357}
358
359/// Per-turn, in-process side channel of typed plugin inputs.
360///
361/// This is an `Any`-keyed map of live Rust values handed to plugins for a
362/// single turn. It is deliberately **not** serializable: the values never
363/// survive a process boundary, so durable effect-host runs explicitly reject a
364/// turn that carries any live inputs (see
365/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
366/// encode replayable data in
367/// `protocol_turn_options` or persisted plugin state.
368#[derive(Clone, Default)]
369pub struct LiveTurnInputs {
370    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
371}
372
373impl LiveTurnInputs {
374    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
375    where
376        T: Send + Sync + 'static,
377    {
378        self.inputs.insert(plugin_id, Arc::new(input));
379    }
380
381    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
382    where
383        T: 'static,
384    {
385        self.inputs
386            .get(plugin_id)
387            .and_then(|input| input.downcast_ref::<T>())
388    }
389
390    fn contains(&self, plugin_id: &'static str) -> bool {
391        self.inputs.contains_key(plugin_id)
392    }
393
394    pub fn plugin_ids(&self) -> Vec<&'static str> {
395        self.inputs.keys().copied().collect()
396    }
397
398    /// Returns an error when live per-turn inputs would make a durable effect
399    /// host replay depend on process-local values.
400    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
401        if self.inputs.is_empty() {
402            return Ok(());
403        }
404        Err(RuntimeError::new(
405            RuntimeErrorCode::DurableEffectLivePluginInput,
406            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
407        ))
408    }
409}
410
411#[derive(Clone, Default)]
412pub struct TurnContext {
413    plugin_inputs: LiveTurnInputs,
414    provider: Option<crate::ProviderHandle>,
415    model: Option<crate::ModelSpec>,
416    prompt: crate::PromptLayer,
417}
418
419impl TurnContext {
420    pub fn new() -> Self {
421        Self::default()
422    }
423
424    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
425    where
426        T: Send + Sync + 'static,
427    {
428        self.plugin_inputs.insert(plugin_id, input);
429    }
430
431    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
432        self.provider = Some(provider);
433    }
434
435    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
436        self.provider.as_ref()
437    }
438
439    pub fn set_model(&mut self, model: crate::ModelSpec) {
440        self.model = Some(model);
441    }
442
443    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
444        self.model.as_ref()
445    }
446
447    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
448    where
449        T: 'static,
450    {
451        self.plugin_inputs.get(plugin_id)
452    }
453
454    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
455        self.plugin_inputs.contains(plugin_id)
456    }
457
458    pub fn has_live_plugin_inputs(&self) -> bool {
459        !self.plugin_inputs.inputs.is_empty()
460    }
461
462    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
463        self.plugin_inputs.plugin_ids()
464    }
465
466    /// Live plugin inputs for this turn. The durable boundary inspects this to
467    /// reject turns carrying non-serializable live state.
468    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
469        &self.plugin_inputs
470    }
471
472    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
473        self.prompt.template = Some(template);
474    }
475
476    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
477        self.prompt.add_contribution(contribution);
478    }
479
480    pub fn replace_prompt_slot(
481        &mut self,
482        slot: crate::PromptSlot,
483        contributions: impl IntoIterator<Item = crate::PromptContribution>,
484    ) {
485        self.prompt.replace_slot(slot, contributions);
486    }
487
488    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
489        self.prompt.clear_slot(slot);
490    }
491
492    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
493        self.prompt = prompt;
494    }
495
496    pub fn prompt_layer(&self) -> &crate::PromptLayer {
497        &self.prompt
498    }
499}
500
501impl fmt::Debug for TurnContext {
502    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
503        f.debug_struct("TurnContext")
504            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
505            .field("has_provider", &self.provider.is_some())
506            .field("has_model", &self.model.is_some())
507            .field("has_prompt_layer", &(!self.prompt.is_empty()))
508            .finish()
509    }
510}
511
512#[derive(Clone)]
513pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
514
515impl ProtocolTurnExtensionHandle {
516    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
517        Self(Arc::new(extension))
518    }
519
520    pub fn as_any(&self) -> &dyn Any {
521        self.0.as_any()
522    }
523
524    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
525        self.0.prompt_contributions()
526    }
527}
528
529impl fmt::Debug for ProtocolTurnExtensionHandle {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        f.write_str("ProtocolTurnExtensionHandle(..)")
532    }
533}
534
535pub trait ProtocolTurnExtension: Send + Sync {
536    fn as_any(&self) -> &dyn Any;
537
538    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
539        Vec::new()
540    }
541}
542
543#[derive(Clone)]
544pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
545
546impl ProtocolSessionExtensionHandle {
547    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
548        Self(Arc::new(extension))
549    }
550
551    pub fn as_any(&self) -> &dyn Any {
552        self.0.as_any()
553    }
554}
555
556impl fmt::Debug for ProtocolSessionExtensionHandle {
557    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
558        f.write_str("ProtocolSessionExtensionHandle(..)")
559    }
560}
561
562pub trait ProtocolSessionExtension: Send + Sync {
563    fn as_any(&self) -> &dyn Any;
564}
565
566#[derive(Clone, Debug)]
567pub(super) enum NormalizedItem {
568    Text(String),
569    Image(crate::AttachmentRef),
570}
571
572/// Canonical assistant output payload.
573#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
574pub struct AssistantOutput {
575    pub safe_text: String,
576    pub raw_text: String,
577    pub state: OutputState,
578}
579
580/// Quality and usability of assembled terminal output.
581#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
582#[serde(rename_all = "snake_case")]
583pub enum OutputState {
584    Usable,
585    EmptyOutput,
586    TracebackOnly,
587    RecoveredFromError,
588}
589
590/// Code execution output observed during a turn.
591#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
592pub struct CodeOutputRecord {
593    pub output: String,
594    #[serde(default, skip_serializing_if = "Option::is_none")]
595    pub error: Option<String>,
596}
597
598/// High-level execution summary for a completed turn.
599#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
600pub struct ExecutionSummary {
601    #[serde(default)]
602    pub had_tool_calls: bool,
603    #[serde(default)]
604    pub had_code_execution: bool,
605    /// Wall-clock turn start as epoch milliseconds, read from the runtime
606    /// [`Clock`]. The measurement window opens when the runtime starts
607    /// claiming the turn (session-execution lease / queued-work claim), so
608    /// it covers the whole host-visible turn. `0` when the turn predates
609    /// this field.
610    #[serde(default)]
611    pub started_at_ms: u64,
612    /// Whole-turn duration in milliseconds — claim through final commit and
613    /// post-persist hooks — measured on the runtime [`Clock`]'s monotonic
614    /// source. `0` when the turn predates this field.
615    #[serde(default)]
616    pub duration_ms: u64,
617}
618
619/// Structured issue surfaced during turn execution.
620#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
621pub struct TurnIssue {
622    pub kind: String,
623    #[serde(default, skip_serializing_if = "Option::is_none")]
624    pub code: Option<String>,
625    #[serde(default, skip_serializing_if = "Option::is_none")]
626    pub terminal_reason: Option<crate::LlmTerminalReason>,
627    pub message: String,
628    #[serde(default, skip_serializing_if = "Option::is_none")]
629    pub raw: Option<String>,
630    /// Whether the failing operation is safe to retry, when the source
631    /// carried a typed signal (provider transports classify retryability;
632    /// terminal LLM responses are deterministic and report `Some(false)`).
633    /// `None` means the source did not know.
634    #[serde(default, skip_serializing_if = "Option::is_none")]
635    pub retryable: Option<bool>,
636    /// Typed provider-failure classification, present only when the issue
637    /// came from a classified LLM provider/transport failure.
638    #[serde(default, skip_serializing_if = "Option::is_none")]
639    pub provider_failure_kind: Option<crate::ProviderFailureKind>,
640}
641
642/// Canonical high-level turn result returned to hosts.
643#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
644pub struct AssembledTurn {
645    pub state: SessionSnapshot,
646    pub outcome: crate::TurnOutcome,
647    pub assistant_output: AssistantOutput,
648    pub execution: ExecutionSummary,
649    #[serde(default)]
650    pub token_usage: TokenUsage,
651    /// Per-(session, source, model) ledger entries for child sessions whose
652    /// LLM calls completed during this turn. `token_usage` above is the
653    /// parent's own LLM tokens; `total_usage` (on the embed-facing
654    /// `TurnResult`) sums both.
655    #[serde(default)]
656    pub children_usage: Vec<TokenLedgerEntry>,
657    #[serde(default)]
658    pub tool_calls: Vec<ToolCallRecord>,
659    #[serde(default)]
660    pub errors: Vec<TurnIssue>,
661}
662
663/// Result of driving one logical host turn through any AgentFrame switches.
664///
665/// A frame switch is an internal runtime continuation, similar to compaction
666/// from a host's perspective. Callers that need a final answer can use
667/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
668#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
669pub struct AgentFrameRun {
670    pub turns: Vec<AssembledTurn>,
671}
672
673impl AgentFrameRun {
674    pub fn final_turn(&self) -> Option<&AssembledTurn> {
675        self.turns.last()
676    }
677
678    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
679        self.turns.pop()
680    }
681
682    pub fn frame_switch_count(&self) -> usize {
683        self.turns
684            .iter()
685            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
686            .count()
687    }
688}
689
690/// Termination policy knobs.
691#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
692pub struct TerminationPolicy {
693    #[serde(default)]
694    pub treat_missing_done_as_failure: bool,
695}
696
697impl Default for TerminationPolicy {
698    fn default() -> Self {
699        Self {
700            treat_missing_done_as_failure: true,
701        }
702    }
703}
704
705/// Host application sink for low-level streaming runtime events.
706/// `SessionEvent` is protocol-specific preview/progress data.
707#[async_trait::async_trait]
708pub trait EventSink: Send + Sync {
709    fn is_noop(&self) -> bool {
710        false
711    }
712
713    async fn emit(&self, event: SessionEvent);
714}
715
716/// No-op sink useful for callers that only care about final state.
717pub struct NoopEventSink;
718
719/// Static no-op event sink for callers that need a `&dyn EventSink` default.
720pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
721
722#[async_trait::async_trait]
723impl EventSink for NoopEventSink {
724    fn is_noop(&self) -> bool {
725        true
726    }
727
728    async fn emit(&self, _event: SessionEvent) {}
729}
730
731/// Stable identifier for a semantic turn activity.
732#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
733#[serde(transparent)]
734pub struct TurnActivityId(pub String);
735
736impl TurnActivityId {
737    pub fn new(id: impl Into<String>) -> Self {
738        Self(id.into())
739    }
740
741    pub fn fresh() -> Self {
742        Self(uuid::Uuid::new_v4().to_string())
743    }
744}
745
746/// App-facing semantic activity emitted during a turn.
747///
748/// `id` is unique per emitted activity event. `correlation_id` groups related
749/// events in the same logical activity, such as code start/completion, tool
750/// start/completion, or text deltas from one output block.
751#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
752pub struct TurnActivity {
753    pub id: TurnActivityId,
754    pub correlation_id: TurnActivityId,
755    #[serde(flatten)]
756    pub event: TurnEvent,
757}
758
759impl TurnActivity {
760    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
761        Self {
762            id: TurnActivityId::fresh(),
763            correlation_id,
764            event,
765        }
766    }
767
768    pub fn independent(event: TurnEvent) -> Self {
769        let correlation_id = TurnActivityId::fresh();
770        Self::new(correlation_id, event)
771    }
772}
773
774/// App-facing semantic event payload for a turn activity.
775///
776/// Unlike [`SessionEvent`], these events are stable application signals rather
777/// than low-level runtime/debug events. Public streams carry these payloads
778/// inside [`TurnActivity`] so every emitted item has identity.
779#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
780#[serde(tag = "type", rename_all = "snake_case")]
781#[allow(clippy::large_enum_variant)]
782pub enum TurnEvent {
783    QueuedWorkStarted {
784        boundary: crate::QueuedWorkClaimBoundary,
785        batch_ids: Vec<String>,
786        causes: Vec<crate::TurnCause>,
787    },
788    ModelRequestStarted {
789        protocol_iteration: usize,
790    },
791    AssistantProseDelta {
792        text: String,
793    },
794    ReasoningDelta {
795        text: String,
796    },
797    CodeBlockStarted {
798        language: String,
799        code: String,
800        #[serde(default, skip_serializing_if = "Option::is_none")]
801        graph_key: Option<String>,
802    },
803    CodeBlockCompleted {
804        language: String,
805        output: String,
806        #[serde(default, skip_serializing_if = "Option::is_none")]
807        error: Option<String>,
808        success: bool,
809        duration_ms: u64,
810        tool_call_ids: Vec<String>,
811        #[serde(default, skip_serializing_if = "Option::is_none")]
812        graph_key: Option<String>,
813    },
814    ToolCallStarted {
815        #[serde(default, skip_serializing_if = "Option::is_none")]
816        call_id: Option<String>,
817        name: String,
818        args: serde_json::Value,
819        /// Graph key of the enclosing code block, when this tool call ran
820        /// inside one. `None` when the call did not run inside a code block.
821        #[serde(default, skip_serializing_if = "Option::is_none")]
822        graph_key: Option<String>,
823        /// Call id of the parent batch tool call, when this call is a child of
824        /// a `batch` dispatch. `None` for top-level tool calls.
825        #[serde(default, skip_serializing_if = "Option::is_none")]
826        parent_call_id: Option<String>,
827    },
828    ToolCallCompleted {
829        #[serde(default, skip_serializing_if = "Option::is_none")]
830        call_id: Option<String>,
831        name: String,
832        args: serde_json::Value,
833        output: crate::ToolCallOutput,
834        duration_ms: u64,
835        /// Graph key of the enclosing code block, when this tool call ran
836        /// inside one. `None` when the call did not run inside a code block.
837        #[serde(default, skip_serializing_if = "Option::is_none")]
838        graph_key: Option<String>,
839        /// Call id of the parent batch tool call, when this call is a child of
840        /// a `batch` dispatch. `None` for top-level tool calls.
841        #[serde(default, skip_serializing_if = "Option::is_none")]
842        parent_call_id: Option<String>,
843    },
844    FinalValue {
845        value: serde_json::Value,
846    },
847    ToolValue {
848        tool_name: String,
849        value: serde_json::Value,
850    },
851    Usage {
852        protocol_iteration: usize,
853        usage: TokenUsage,
854        cumulative: TokenUsage,
855    },
856    ChildUsage {
857        session_id: String,
858        source: String,
859        model: String,
860        protocol_iteration: usize,
861        usage: TokenUsage,
862        cumulative: TokenUsage,
863    },
864    RetryStatus {
865        wait_seconds: u64,
866        attempt: usize,
867        max_attempts: usize,
868        reason: String,
869    },
870    PluginRuntime {
871        plugin_id: String,
872        event: crate::PluginRuntimeEvent,
873    },
874    QueuedInputAccepted {
875        checkpoint: crate::CheckpointKind,
876        inputs: Vec<crate::AcceptedInjectedTurnInput>,
877    },
878    QueuedMessagesCommitted {
879        messages: Vec<crate::PluginMessage>,
880        checkpoint: crate::CheckpointKind,
881    },
882    Error {
883        message: String,
884    },
885}
886
887#[async_trait::async_trait]
888pub trait TurnActivitySink: Send + Sync {
889    fn is_noop(&self) -> bool {
890        false
891    }
892
893    async fn emit(&self, activity: TurnActivity);
894}
895
896pub struct NoopTurnActivitySink;
897
898/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
899pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
900
901#[async_trait::async_trait]
902impl TurnActivitySink for NoopTurnActivitySink {
903    fn is_noop(&self) -> bool {
904        true
905    }
906
907    async fn emit(&self, _activity: TurnActivity) {}
908}
909
910/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
911/// turn-driving entry points (`stream_turn`,
912/// `stream_turn_with_agent_frames`).
913///
914/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
915/// default to no-op sinks. Execution scope is explicit and required at every
916/// runtime boundary that can execute nondeterministic work.
917pub struct TurnOptions<'a> {
918    events: Option<&'a dyn EventSink>,
919    turn_events: Option<&'a dyn TurnActivitySink>,
920    scoped_effect_controller: ScopedEffectController<'a>,
921    cancel: CancellationToken,
922}
923
924impl<'a> TurnOptions<'a> {
925    pub fn new(
926        cancel: CancellationToken,
927        scoped_effect_controller: ScopedEffectController<'a>,
928    ) -> Self {
929        Self {
930            events: None,
931            turn_events: None,
932            scoped_effect_controller,
933            cancel,
934        }
935    }
936
937    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
938        self.events = Some(events);
939        self
940    }
941
942    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
943        self.turn_events = Some(turn_events);
944        self
945    }
946
947    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
948        self.events.unwrap_or(&NOOP_EVENT_SINK)
949    }
950
951    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
952        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
953    }
954
955    pub(crate) fn execution_scope_id(&self) -> &str {
956        self.scoped_effect_controller.scope_id()
957    }
958
959    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
960        self.scoped_effect_controller.clone()
961    }
962}
963
964enum RuntimeStreamEvent {
965    Session(SessionEvent),
966    Turn(TurnActivity),
967}
968
969#[derive(Clone)]
970pub struct SessionStoreCreateRequest {
971    pub session_id: String,
972    pub relation: SessionRelation,
973    pub policy: SessionPolicy,
974}
975
976impl SessionStoreCreateRequest {
977    pub fn parent_session_id(&self) -> Option<&str> {
978        self.relation.parent_session_id()
979    }
980}
981
982#[async_trait::async_trait]
983pub trait SessionStoreFactory: Send + Sync {
984    /// Durability tier the stores produced by this factory provide; defaults to
985    /// [`DurabilityTier::Inline`].
986    fn durability_tier(&self) -> crate::DurabilityTier {
987        crate::DurabilityTier::Inline
988    }
989
990    async fn create_store(
991        &self,
992        request: &SessionStoreCreateRequest,
993    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
994
995    async fn open_existing_store(
996        &self,
997        _request: &SessionStoreCreateRequest,
998    ) -> Result<Option<Arc<dyn crate::store::RuntimePersistence>>, String> {
999        Ok(None)
1000    }
1001
1002    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
1003}
1004
1005/// Generic runtime for CLI or programmatic embedding.
1006pub struct LashRuntime {
1007    pub(in crate::runtime) session: Option<Session>,
1008    pub(in crate::runtime) policy: SessionPolicy,
1009    pub(in crate::runtime) host: RuntimeHost,
1010    pub(in crate::runtime) services: RuntimeServices,
1011    pub(in crate::runtime) state: RuntimeSessionState,
1012    pub(in crate::runtime) runtime_scope_id: Arc<str>,
1013    pub(in crate::runtime) runtime_lease_owner: crate::LeaseOwnerIdentity,
1014    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
1015    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
1016    /// Protocol-owned turn options for this session.
1017    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
1018    /// Session-scoped token cost ledger. Shared by ALL
1019    /// `RuntimeSessionServices` instances created from this runtime
1020    /// (both per-turn and async maintenance). Entries accumulate here
1021    /// and are drained into `state.token_ledger` at turn-commit time.
1022    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
1023    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
1024    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
1025    /// Resident-graph policy chosen by the host. Controls whether
1026    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
1027    /// graph or just the active path, matching the trimming behavior set at
1028    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
1029    pub(in crate::runtime) residency: Residency,
1030}