Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod clock;
5mod config_ops;
6mod effect;
7mod environment;
8mod error;
9mod host;
10mod in_memory_store;
11mod io;
12mod lifecycle;
13mod observation;
14mod process;
15mod process_work_driver;
16mod process_worker;
17mod queued_work_driver;
18pub mod scenario_contracts;
19mod session_api;
20mod session_execution_lease;
21mod session_manager;
22mod session_ops;
23mod state;
24#[cfg(test)]
25pub(crate) mod tests;
26mod turn_boundary;
27mod turn_commit_draft;
28mod turn_driver;
29mod turn_graph_editor;
30mod turn_input_ingress;
31mod turn_loop;
32mod turn_queue;
33mod usage;
34
35use std::any::Any;
36use std::collections::HashMap;
37use std::fmt;
38use std::sync::Arc;
39use std::sync::Mutex as StdMutex;
40use std::sync::atomic::{AtomicBool, Ordering};
41
42use tokio::sync::{Mutex, mpsc};
43use tokio_util::sync::CancellationToken;
44
45use crate::llm::types::{
46    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
47    LlmStreamEvent, LlmUsage,
48};
49use crate::plugin::{
50    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
51};
52use crate::sansio::{LlmCallError, Response};
53use crate::session_model::{
54    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
55    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
56    transport_stream_events,
57};
58use crate::{
59    CheckpointKind, PersistentRuntimeServices, PluginOperationInvokeError, PromptHookContext,
60    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
61    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
62};
63use crate::{Effect, TurnMachine};
64
65use host::*;
66use session_execution_lease::*;
67use session_manager::*;
68use turn_boundary::*;
69use turn_commit_draft::*;
70use turn_driver::*;
71
72pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 30 * 1000;
73pub(crate) const RUNTIME_TURN_LEASE_RENEW_MS: u64 = 10 * 1000;
74const _: () = {
75    assert!(RUNTIME_TURN_LEASE_TTL_MS == 30_000);
76    assert!(RUNTIME_TURN_LEASE_RENEW_MS == 10_000);
77    assert!(RUNTIME_TURN_LEASE_TTL_MS == RUNTIME_TURN_LEASE_RENEW_MS * 3);
78};
79
80pub(super) fn runtime_error_from_store_commit(err: crate::store::StoreError) -> RuntimeError {
81    match err {
82        crate::store::StoreError::SessionExecutionLeaseExpired { session_id } => RuntimeError::new(
83            RuntimeErrorCode::SessionExecutionLeaseLost,
84            format!("session execution lease for session `{session_id}` was lost before commit"),
85        ),
86        err => RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()),
87    }
88}
89
90// `PromptUsage` is re-exported below alongside the runtime's own types.
91pub use lash_sansio::PromptUsage;
92
93use assembly::{
94    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
95    LlmStreamState, LlmStreamSummary, TurnAssembler,
96};
97#[cfg(test)]
98#[allow(unused_imports)]
99use assembly::{classify_output_state, sanitize_assistant_output};
100pub use builder::EmbeddedRuntimeBuilder;
101pub use causal::process_event_invocation;
102pub(crate) use causal::tool_retry_sleep_invocation;
103pub use clock::{Clock, SystemClock};
104pub(crate) use effect::RuntimeEffectControllerHandle;
105pub use effect::{
106    AwaitEventKey, AwaitEventWaitIdentity, CausalRef, EffectHost, ExecutionScope,
107    ExternalCompletionError, InlineEffectHost, InlineRuntimeEffectController, LlmAttachmentSpec,
108    LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, Resolution, ResolveOutcome,
109    RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
110    RuntimeEffectEnvelope, RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome,
111    RuntimeInvocation, RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
112    ToolAttemptEffectOutcome, ToolAttemptLaunch, ToolBatchEffectOutcome, ToolCallLaunch,
113};
114pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
115pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
116pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
117pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
118use io::normalize_input_items;
119pub use observation::{
120    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
121    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
122    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
123    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
124    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
125    SessionRevision,
126};
127#[cfg(any(test, feature = "testing"))]
128pub use process::TestLocalProcessRegistry;
129pub use process::{
130    DefaultProcessCancelAbility, InMemoryProcessExecutionEnvStore, ObservedProcess,
131    ObservedProcessEvent, ObservedWorkItem, PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput,
132    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
133    ProcessCancelSummary, ProcessEngine, ProcessEngineRegistry, ProcessEngineRunContext,
134    ProcessEngineRunGuard, ProcessEngineRuntimeContext, ProcessEngineValidationContext,
135    ProcessEvent, ProcessEventAppendPlan, ProcessEventAppendRequest, ProcessEventAppendResult,
136    ProcessEventSemantics, ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext,
137    ProcessExecutionEnvRef, ProcessExecutionEnvSpec, ProcessExecutionEnvStore, ProcessExternalRef,
138    ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry, ProcessHandleSummary,
139    ProcessId, ProcessIdentity, ProcessInput, ProcessLease, ProcessLeaseCompletion,
140    ProcessLifecycleStatus, ProcessListFilter, ProcessListMode, ProcessOpScope, ProcessOriginator,
141    ProcessProvenance, ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessService,
142    ProcessSessionDeleteReport, ProcessSpawnProvenance, ProcessStartGrant, ProcessStartOptions,
143    ProcessStartRequest, ProcessStatus, ProcessStatusFilter, ProcessTerminalSemantics,
144    ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector, ProcessWake,
145    ProcessWakeDedupeKey, ProcessWakeDelivery, ProcessWakeDeliveryRequest, ProcessWakeSpec,
146    ProcessWorkObserver, ProcessWorkSnapshot, SessionScope, SessionScopeId,
147    UnavailableProcessService, WaitKind, WaitState, apply_process_status_projection,
148    current_epoch_ms, epoch_ms_from_system_time, load_process_execution_env,
149    materialize_process_event_semantics, persist_process_execution_env,
150    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
151    process_signal_event_type, process_signal_name_from_event_type, process_signal_wait_key,
152    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
153    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
154    validate_process_signal_name,
155};
156pub use process_work_driver::{InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver};
157pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
158pub use queued_work_driver::{QueuedWorkDriver, QueuedWorkRunHandle, QueuedWorkRunRequest};
159pub use scenario_contracts::{RUNTIME_SCENARIO_CONTRACTS, ScenarioContractSpec};
160pub use session_manager::DirectCompletionClient;
161pub use state::RuntimeSessionState;
162use state::{
163    append_session_nodes_to_state_with_clock, apply_residency_on_load, apply_session_checkpoint,
164    apply_session_head, normalize_session_graph, open_agent_frame_in_state_with_clock,
165};
166pub use turn_input_ingress::{
167    PendingTurnInput, PendingTurnInputCancelOutcome, PendingTurnInputCancelResult,
168    PendingTurnInputCancelTarget, PendingTurnInputClaimDiagnostics, PendingTurnInputDraft,
169    PendingTurnInputSuffixCancelOutcome, QueuedCheckpointTurnInput, TURN_INPUT_CLAIM_TTL_MS,
170    TurnInputCheckpointBoundary, TurnInputClaim, TurnInputClaimMode, TurnInputCompletion,
171    TurnInputIngress, TurnInputState,
172};
173pub use turn_loop::ensure_durable_effect_input;
174pub use turn_queue::{
175    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
176    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
177    QueuedWorkClass, QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand,
178    SessionCommandReceipt, SlotPolicy, process_wake_batch_draft,
179};
180pub use usage::{
181    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
182    diff_usage_reports,
183};
184use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
185
186#[doc(hidden)]
187#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
188pub enum RuntimeTurnPhase {
189    ContextTransform,
190    BeforeTurnHooks,
191    PromptBuild,
192    EffectLoop,
193    FinalizeTurn,
194    PersistTurn,
195    FinalCommit,
196    PostPersistHooks,
197}
198
199#[doc(hidden)]
200pub trait RuntimeTurnPhaseProbe: Send + Sync {
201    fn begin(&self, phase: RuntimeTurnPhase);
202    fn end(&self, phase: RuntimeTurnPhase);
203    fn begin_named(&self, _phase: &str) {}
204    fn end_named(&self, _phase: &str) {}
205}
206
207#[doc(hidden)]
208#[derive(Clone, Default)]
209pub struct RuntimeTurnPhaseProbeSlot {
210    probes: Arc<StdMutex<HashMap<crate::SessionScopeId, Arc<dyn RuntimeTurnPhaseProbe>>>>,
211}
212
213impl RuntimeTurnPhaseProbeSlot {
214    pub fn set_for_session(
215        &self,
216        session_id: impl Into<String>,
217        probe: Arc<dyn RuntimeTurnPhaseProbe>,
218    ) {
219        self.set_for_scope(&crate::SessionScope::new(session_id), probe);
220    }
221
222    pub fn set_for_scope(
223        &self,
224        scope: &crate::SessionScope,
225        probe: Arc<dyn RuntimeTurnPhaseProbe>,
226    ) {
227        self.probes
228            .lock()
229            .expect("runtime phase probe slot")
230            .insert(scope.id(), probe);
231    }
232
233    pub fn get_for_scope(
234        &self,
235        scope: &crate::SessionScope,
236    ) -> Option<Arc<dyn RuntimeTurnPhaseProbe>> {
237        let probes = self.probes.lock().expect("runtime phase probe slot");
238        probes.get(&scope.id()).cloned().or_else(|| {
239            probes
240                .get(&crate::SessionScope::new(&scope.session_id).id())
241                .cloned()
242        })
243    }
244}
245
246#[doc(hidden)]
247pub struct RuntimeNamedPhase {
248    probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
249    phase: &'static str,
250}
251
252impl RuntimeNamedPhase {
253    pub fn begin(
254        probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
255        phase: &'static str,
256    ) -> RuntimeNamedPhase {
257        if let Some(probe) = probe.as_ref() {
258            probe.begin_named(phase);
259        }
260        RuntimeNamedPhase { probe, phase }
261    }
262}
263
264impl Drop for RuntimeNamedPhase {
265    fn drop(&mut self) {
266        if let Some(probe) = self.probe.as_ref() {
267            probe.end_named(self.phase);
268        }
269    }
270}
271
272/// Host-provided per-turn input.
273#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
274#[serde(tag = "type", rename_all = "snake_case")]
275pub enum InputItem {
276    Text { text: String },
277    ImageRef { id: String },
278}
279
280impl InputItem {
281    pub fn text(text: impl Into<String>) -> Self {
282        Self::Text { text: text.into() }
283    }
284
285    pub fn image_ref(id: impl Into<String>) -> Self {
286        Self::ImageRef { id: id.into() }
287    }
288}
289
290/// Host-provided per-turn input.
291#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
292pub struct TurnInput {
293    pub items: Vec<InputItem>,
294    #[serde(default)]
295    pub image_blobs: HashMap<String, Vec<u8>>,
296    /// Per-turn override for protocol-owned turn options.
297    #[serde(default, skip_serializing_if = "Option::is_none")]
298    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
299    /// Optional externally-stable trace turn id. Normal runtime callers leave
300    /// this empty and the runtime generates one per outer turn.
301    #[serde(default, skip_serializing_if = "Option::is_none")]
302    pub trace_turn_id: Option<String>,
303    #[serde(skip)]
304    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
305    #[serde(skip)]
306    pub turn_context: TurnContext,
307}
308
309impl TurnInput {
310    pub fn empty() -> Self {
311        Self::items(std::iter::empty())
312    }
313
314    pub fn text(text: impl Into<String>) -> Self {
315        Self::items([InputItem::text(text)])
316    }
317
318    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
319        Self {
320            items: items.into_iter().collect(),
321            image_blobs: HashMap::new(),
322            protocol_turn_options: None,
323            trace_turn_id: None,
324            protocol_extension: None,
325            turn_context: TurnContext::default(),
326        }
327    }
328
329    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
330        self.image_blobs.insert(id.into(), bytes);
331        self
332    }
333
334    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
335    where
336        I: IntoIterator<Item = (K, Vec<u8>)>,
337        K: Into<String>,
338    {
339        self.image_blobs.extend(
340            image_blobs
341                .into_iter()
342                .map(|(id, bytes)| (id.into(), bytes)),
343        );
344        self
345    }
346
347    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
348        let id = id.into();
349        self.items.push(InputItem::image_ref(id.clone()));
350        self.image_blobs.insert(id, bytes);
351        self
352    }
353
354    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
355        self.protocol_turn_options = Some(options);
356        self
357    }
358
359    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
360        self.trace_turn_id = Some(trace_turn_id.into());
361        self
362    }
363}
364
365/// Per-turn, in-process side channel of typed plugin inputs.
366///
367/// This is an `Any`-keyed map of live Rust values handed to plugins for a
368/// single turn. It is deliberately **not** serializable: the values never
369/// survive a process boundary, so durable effect-host runs explicitly reject a
370/// turn that carries any live inputs (see
371/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
372/// encode replayable data in
373/// `protocol_turn_options` or persisted plugin state.
374#[derive(Clone, Default)]
375pub struct LiveTurnInputs {
376    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
377}
378
379impl LiveTurnInputs {
380    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
381    where
382        T: Send + Sync + 'static,
383    {
384        self.inputs.insert(plugin_id, Arc::new(input));
385    }
386
387    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
388    where
389        T: 'static,
390    {
391        self.inputs
392            .get(plugin_id)
393            .and_then(|input| input.downcast_ref::<T>())
394    }
395
396    fn contains(&self, plugin_id: &'static str) -> bool {
397        self.inputs.contains_key(plugin_id)
398    }
399
400    pub fn plugin_ids(&self) -> Vec<&'static str> {
401        self.inputs.keys().copied().collect()
402    }
403
404    /// Returns an error when live per-turn inputs would make a durable effect
405    /// host replay depend on process-local values.
406    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
407        if self.inputs.is_empty() {
408            return Ok(());
409        }
410        Err(RuntimeError::new(
411            RuntimeErrorCode::DurableEffectLivePluginInput,
412            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
413        ))
414    }
415}
416
417#[derive(Clone, Default)]
418pub struct TurnContext {
419    plugin_inputs: LiveTurnInputs,
420    provider: Option<crate::ProviderHandle>,
421    model: Option<crate::ModelSpec>,
422    prompt: crate::PromptLayer,
423}
424
425impl TurnContext {
426    pub fn new() -> Self {
427        Self::default()
428    }
429
430    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
431    where
432        T: Send + Sync + 'static,
433    {
434        self.plugin_inputs.insert(plugin_id, input);
435    }
436
437    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
438        self.provider = Some(provider);
439    }
440
441    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
442        self.provider.as_ref()
443    }
444
445    pub fn set_model(&mut self, model: crate::ModelSpec) {
446        self.model = Some(model);
447    }
448
449    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
450        self.model.as_ref()
451    }
452
453    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
454    where
455        T: 'static,
456    {
457        self.plugin_inputs.get(plugin_id)
458    }
459
460    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
461        self.plugin_inputs.contains(plugin_id)
462    }
463
464    pub fn has_live_plugin_inputs(&self) -> bool {
465        !self.plugin_inputs.inputs.is_empty()
466    }
467
468    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
469        self.plugin_inputs.plugin_ids()
470    }
471
472    /// Live plugin inputs for this turn. The durable boundary inspects this to
473    /// reject turns carrying non-serializable live state.
474    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
475        &self.plugin_inputs
476    }
477
478    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
479        self.prompt.template = Some(template);
480    }
481
482    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
483        self.prompt.add_contribution(contribution);
484    }
485
486    pub fn replace_prompt_slot(
487        &mut self,
488        slot: crate::PromptSlot,
489        contributions: impl IntoIterator<Item = crate::PromptContribution>,
490    ) {
491        self.prompt.replace_slot(slot, contributions);
492    }
493
494    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
495        self.prompt.clear_slot(slot);
496    }
497
498    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
499        self.prompt = prompt;
500    }
501
502    pub fn prompt_layer(&self) -> &crate::PromptLayer {
503        &self.prompt
504    }
505}
506
507impl fmt::Debug for TurnContext {
508    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
509        f.debug_struct("TurnContext")
510            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
511            .field("has_provider", &self.provider.is_some())
512            .field("has_model", &self.model.is_some())
513            .field("has_prompt_layer", &(!self.prompt.is_empty()))
514            .finish()
515    }
516}
517
518#[derive(Clone)]
519pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
520
521impl ProtocolTurnExtensionHandle {
522    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
523        Self(Arc::new(extension))
524    }
525
526    pub fn as_any(&self) -> &dyn Any {
527        self.0.as_any()
528    }
529
530    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
531        self.0.prompt_contributions()
532    }
533}
534
535impl fmt::Debug for ProtocolTurnExtensionHandle {
536    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537        f.write_str("ProtocolTurnExtensionHandle(..)")
538    }
539}
540
541pub trait ProtocolTurnExtension: Send + Sync {
542    fn as_any(&self) -> &dyn Any;
543
544    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
545        Vec::new()
546    }
547}
548
549#[derive(Clone)]
550pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
551
552impl ProtocolSessionExtensionHandle {
553    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
554        Self(Arc::new(extension))
555    }
556
557    pub fn as_any(&self) -> &dyn Any {
558        self.0.as_any()
559    }
560}
561
562impl fmt::Debug for ProtocolSessionExtensionHandle {
563    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
564        f.write_str("ProtocolSessionExtensionHandle(..)")
565    }
566}
567
568pub trait ProtocolSessionExtension: Send + Sync {
569    fn as_any(&self) -> &dyn Any;
570}
571
572#[derive(Clone, Debug)]
573pub(super) enum NormalizedItem {
574    Text(String),
575    Image(crate::AttachmentRef),
576}
577
578/// Canonical assistant output payload.
579#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
580pub struct AssistantOutput {
581    pub safe_text: String,
582    pub raw_text: String,
583    pub state: OutputState,
584}
585
586/// Quality and usability of assembled terminal output.
587#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
588#[serde(rename_all = "snake_case")]
589pub enum OutputState {
590    Usable,
591    EmptyOutput,
592    TracebackOnly,
593    RecoveredFromError,
594}
595
596/// Code execution output observed during a turn.
597#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
598pub struct CodeOutputRecord {
599    pub output: String,
600    #[serde(default, skip_serializing_if = "Option::is_none")]
601    pub error: Option<String>,
602}
603
604/// High-level execution summary for a completed turn.
605#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
606pub struct ExecutionSummary {
607    #[serde(default)]
608    pub had_tool_calls: bool,
609    #[serde(default)]
610    pub had_code_execution: bool,
611}
612
613/// Structured issue surfaced during turn execution.
614#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
615pub struct TurnIssue {
616    pub kind: String,
617    #[serde(default, skip_serializing_if = "Option::is_none")]
618    pub code: Option<String>,
619    #[serde(default, skip_serializing_if = "Option::is_none")]
620    pub terminal_reason: Option<crate::LlmTerminalReason>,
621    pub message: String,
622    #[serde(default, skip_serializing_if = "Option::is_none")]
623    pub raw: Option<String>,
624}
625
626/// Canonical high-level turn result returned to hosts.
627#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
628pub struct AssembledTurn {
629    pub state: SessionSnapshot,
630    pub outcome: crate::TurnOutcome,
631    pub assistant_output: AssistantOutput,
632    pub execution: ExecutionSummary,
633    #[serde(default)]
634    pub token_usage: TokenUsage,
635    /// Per-(session, source, model) ledger entries for child sessions whose
636    /// LLM calls completed during this turn. `token_usage` above is the
637    /// parent's own LLM tokens; `total_usage` (on the embed-facing
638    /// `TurnResult`) sums both.
639    #[serde(default)]
640    pub children_usage: Vec<TokenLedgerEntry>,
641    #[serde(default)]
642    pub tool_calls: Vec<ToolCallRecord>,
643    #[serde(default)]
644    pub errors: Vec<TurnIssue>,
645}
646
647/// Result of driving one logical host turn through any AgentFrame switches.
648///
649/// A frame switch is an internal runtime continuation, similar to compaction
650/// from a host's perspective. Callers that need a final answer can use
651/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
652#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
653pub struct AgentFrameRun {
654    pub turns: Vec<AssembledTurn>,
655}
656
657impl AgentFrameRun {
658    pub fn final_turn(&self) -> Option<&AssembledTurn> {
659        self.turns.last()
660    }
661
662    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
663        self.turns.pop()
664    }
665
666    pub fn frame_switch_count(&self) -> usize {
667        self.turns
668            .iter()
669            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
670            .count()
671    }
672}
673
674/// Termination policy knobs.
675#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
676pub struct TerminationPolicy {
677    #[serde(default)]
678    pub treat_missing_done_as_failure: bool,
679}
680
681impl Default for TerminationPolicy {
682    fn default() -> Self {
683        Self {
684            treat_missing_done_as_failure: true,
685        }
686    }
687}
688
689/// Host application sink for low-level streaming runtime events.
690/// `SessionEvent` is protocol-specific preview/progress data.
691#[async_trait::async_trait]
692pub trait EventSink: Send + Sync {
693    fn is_noop(&self) -> bool {
694        false
695    }
696
697    async fn emit(&self, event: SessionEvent);
698}
699
700/// No-op sink useful for callers that only care about final state.
701pub struct NoopEventSink;
702
703/// Static no-op event sink for callers that need a `&dyn EventSink` default.
704pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
705
706#[async_trait::async_trait]
707impl EventSink for NoopEventSink {
708    fn is_noop(&self) -> bool {
709        true
710    }
711
712    async fn emit(&self, _event: SessionEvent) {}
713}
714
715/// Stable identifier for a semantic turn activity.
716#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
717#[serde(transparent)]
718pub struct TurnActivityId(pub String);
719
720impl TurnActivityId {
721    pub fn new(id: impl Into<String>) -> Self {
722        Self(id.into())
723    }
724
725    pub fn fresh() -> Self {
726        Self(uuid::Uuid::new_v4().to_string())
727    }
728}
729
730/// App-facing semantic activity emitted during a turn.
731///
732/// `id` is unique per emitted activity event. `correlation_id` groups related
733/// events in the same logical activity, such as code start/completion, tool
734/// start/completion, or text deltas from one output block.
735#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
736pub struct TurnActivity {
737    pub id: TurnActivityId,
738    pub correlation_id: TurnActivityId,
739    #[serde(flatten)]
740    pub event: TurnEvent,
741}
742
743impl TurnActivity {
744    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
745        Self {
746            id: TurnActivityId::fresh(),
747            correlation_id,
748            event,
749        }
750    }
751
752    pub fn independent(event: TurnEvent) -> Self {
753        let correlation_id = TurnActivityId::fresh();
754        Self::new(correlation_id, event)
755    }
756}
757
758/// App-facing semantic event payload for a turn activity.
759///
760/// Unlike [`SessionEvent`], these events are stable application signals rather
761/// than low-level runtime/debug events. Public streams carry these payloads
762/// inside [`TurnActivity`] so every emitted item has identity.
763#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
764#[serde(tag = "type", rename_all = "snake_case")]
765#[allow(clippy::large_enum_variant)]
766pub enum TurnEvent {
767    QueuedWorkStarted {
768        boundary: crate::QueuedWorkClaimBoundary,
769        batch_ids: Vec<String>,
770        causes: Vec<crate::TurnCause>,
771    },
772    ModelRequestStarted {
773        protocol_iteration: usize,
774    },
775    AssistantProseDelta {
776        text: String,
777    },
778    ReasoningDelta {
779        text: String,
780    },
781    CodeBlockStarted {
782        language: String,
783        code: String,
784        #[serde(default, skip_serializing_if = "Option::is_none")]
785        graph_key: Option<String>,
786    },
787    CodeBlockCompleted {
788        language: String,
789        output: String,
790        error: Option<String>,
791        success: bool,
792        duration_ms: u64,
793        tool_call_ids: Vec<String>,
794        #[serde(default, skip_serializing_if = "Option::is_none")]
795        graph_key: Option<String>,
796    },
797    ToolCallStarted {
798        call_id: Option<String>,
799        name: String,
800        args: serde_json::Value,
801    },
802    ToolCallCompleted {
803        call_id: Option<String>,
804        name: String,
805        args: serde_json::Value,
806        output: crate::ToolCallOutput,
807        duration_ms: u64,
808    },
809    FinalValue {
810        value: serde_json::Value,
811    },
812    ToolValue {
813        tool_name: String,
814        value: serde_json::Value,
815    },
816    Usage {
817        protocol_iteration: usize,
818        usage: TokenUsage,
819        cumulative: TokenUsage,
820    },
821    ChildUsage {
822        session_id: String,
823        source: String,
824        model: String,
825        protocol_iteration: usize,
826        usage: TokenUsage,
827        cumulative: TokenUsage,
828    },
829    RetryStatus {
830        wait_seconds: u64,
831        attempt: usize,
832        max_attempts: usize,
833        reason: String,
834    },
835    PluginRuntime {
836        plugin_id: String,
837        event: crate::PluginRuntimeEvent,
838    },
839    QueuedInputAccepted {
840        checkpoint: crate::CheckpointKind,
841        inputs: Vec<crate::AcceptedInjectedTurnInput>,
842    },
843    QueuedMessagesCommitted {
844        messages: Vec<crate::PluginMessage>,
845        checkpoint: crate::CheckpointKind,
846    },
847    Error {
848        message: String,
849    },
850}
851
852#[async_trait::async_trait]
853pub trait TurnActivitySink: Send + Sync {
854    fn is_noop(&self) -> bool {
855        false
856    }
857
858    async fn emit(&self, activity: TurnActivity);
859}
860
861pub struct NoopTurnActivitySink;
862
863/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
864pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
865
866#[async_trait::async_trait]
867impl TurnActivitySink for NoopTurnActivitySink {
868    fn is_noop(&self) -> bool {
869        true
870    }
871
872    async fn emit(&self, _activity: TurnActivity) {}
873}
874
875/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
876/// turn-driving entry points (`stream_turn`,
877/// `stream_turn_with_agent_frames`).
878///
879/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
880/// default to no-op sinks. Execution scope is explicit and required at every
881/// runtime boundary that can execute nondeterministic work.
882pub struct TurnOptions<'a> {
883    events: Option<&'a dyn EventSink>,
884    turn_events: Option<&'a dyn TurnActivitySink>,
885    scoped_effect_controller: ScopedEffectController<'a>,
886    cancel: CancellationToken,
887}
888
889impl<'a> TurnOptions<'a> {
890    pub fn new(
891        cancel: CancellationToken,
892        scoped_effect_controller: ScopedEffectController<'a>,
893    ) -> Self {
894        Self {
895            events: None,
896            turn_events: None,
897            scoped_effect_controller,
898            cancel,
899        }
900    }
901
902    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
903        self.events = Some(events);
904        self
905    }
906
907    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
908        self.turn_events = Some(turn_events);
909        self
910    }
911
912    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
913        self.events.unwrap_or(&NOOP_EVENT_SINK)
914    }
915
916    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
917        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
918    }
919
920    pub(crate) fn execution_scope_id(&self) -> &str {
921        self.scoped_effect_controller.scope_id()
922    }
923
924    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
925        self.scoped_effect_controller.clone()
926    }
927}
928
929enum RuntimeStreamEvent {
930    Session(SessionEvent),
931    Turn(TurnActivity),
932}
933
934#[derive(Clone)]
935pub struct SessionStoreCreateRequest {
936    pub session_id: String,
937    pub relation: SessionRelation,
938    pub policy: SessionPolicy,
939}
940
941impl SessionStoreCreateRequest {
942    pub fn parent_session_id(&self) -> Option<&str> {
943        self.relation.parent_session_id()
944    }
945}
946
947#[async_trait::async_trait]
948pub trait SessionStoreFactory: Send + Sync {
949    /// Durability tier the stores produced by this factory provide; defaults to
950    /// [`DurabilityTier::Inline`].
951    fn durability_tier(&self) -> crate::DurabilityTier {
952        crate::DurabilityTier::Inline
953    }
954
955    async fn create_store(
956        &self,
957        request: &SessionStoreCreateRequest,
958    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
959
960    async fn open_existing_store(
961        &self,
962        _request: &SessionStoreCreateRequest,
963    ) -> Result<Option<Arc<dyn crate::store::RuntimePersistence>>, String> {
964        Ok(None)
965    }
966
967    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
968}
969
970/// Generic runtime for CLI or programmatic embedding.
971pub struct LashRuntime {
972    pub(in crate::runtime) session: Option<Session>,
973    pub(in crate::runtime) policy: SessionPolicy,
974    pub(in crate::runtime) host: RuntimeHost,
975    pub(in crate::runtime) services: RuntimeServices,
976    pub(in crate::runtime) state: RuntimeSessionState,
977    pub(in crate::runtime) runtime_scope_id: Arc<str>,
978    pub(in crate::runtime) runtime_lease_owner: crate::LeaseOwnerIdentity,
979    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
980    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
981    /// Protocol-owned turn options for this session.
982    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
983    /// Session-scoped token cost ledger. Shared by ALL
984    /// `RuntimeSessionServices` instances created from this runtime
985    /// (both per-turn and async maintenance). Entries accumulate here
986    /// and are drained into `state.token_ledger` at turn-commit time.
987    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
988    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
989    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
990    /// Resident-graph policy chosen by the host. Controls whether
991    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
992    /// graph or just the active path, matching the trimming behavior set at
993    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
994    pub(in crate::runtime) residency: Residency,
995}