Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod clock;
5mod config_ops;
6mod effect;
7mod environment;
8mod error;
9mod host;
10mod in_memory_store;
11mod io;
12mod lifecycle;
13mod observation;
14mod process;
15mod process_work_driver;
16mod process_worker;
17mod queued_work_driver;
18mod session_api;
19mod session_execution_lease;
20mod session_manager;
21mod session_ops;
22mod state;
23#[cfg(test)]
24pub(crate) mod tests;
25mod turn_boundary;
26mod turn_commit_draft;
27mod turn_driver;
28mod turn_graph_editor;
29mod turn_loop;
30mod turn_queue;
31mod usage;
32
33use std::any::Any;
34use std::collections::HashMap;
35use std::fmt;
36use std::sync::Arc;
37use std::sync::Mutex as StdMutex;
38use std::sync::atomic::{AtomicBool, Ordering};
39
40use tokio::sync::{Mutex, mpsc};
41use tokio_util::sync::CancellationToken;
42
43use crate::llm::types::{
44    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
45    LlmStreamEvent, LlmUsage,
46};
47use crate::plugin::{
48    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
49};
50use crate::sansio::{LlmCallError, Response};
51use crate::session_model::{
52    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
53    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
54    transport_stream_events,
55};
56use crate::{
57    CheckpointKind, PersistentRuntimeServices, PluginOperationInvokeError, PromptHookContext,
58    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
59    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
60};
61use crate::{Effect, TurnMachine};
62
63use host::*;
64use session_execution_lease::*;
65use session_manager::*;
66use turn_boundary::*;
67use turn_commit_draft::*;
68use turn_driver::*;
69
70pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 30 * 1000;
71pub(crate) const RUNTIME_TURN_LEASE_RENEW_MS: u64 = 10 * 1000;
72
73pub(super) fn runtime_error_from_store_commit(err: crate::store::StoreError) -> RuntimeError {
74    match err {
75        crate::store::StoreError::SessionExecutionLeaseExpired { session_id } => RuntimeError::new(
76            RuntimeErrorCode::SessionExecutionLeaseLost,
77            format!("session execution lease for session `{session_id}` was lost before commit"),
78        ),
79        err => RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()),
80    }
81}
82
83// `PromptUsage` is re-exported below alongside the runtime's own types.
84pub use lash_sansio::PromptUsage;
85
86use assembly::{
87    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
88    LlmStreamState, LlmStreamSummary, TurnAssembler,
89};
90#[cfg(test)]
91#[allow(unused_imports)]
92use assembly::{classify_output_state, sanitize_assistant_output};
93pub use builder::EmbeddedRuntimeBuilder;
94pub use causal::process_event_invocation;
95pub(crate) use causal::tool_retry_sleep_invocation;
96pub use clock::{Clock, SystemClock};
97pub(crate) use effect::RuntimeEffectControllerHandle;
98pub use effect::{
99    AwaitEventKey, AwaitEventWaitIdentity, CausalRef, EffectHost, ExecutionScope,
100    ExternalCompletionError, InlineEffectHost, InlineRuntimeEffectController, LlmAttachmentSpec,
101    LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, Resolution, ResolveOutcome,
102    RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
103    RuntimeEffectEnvelope, RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome,
104    RuntimeInvocation, RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
105    ToolAttemptEffectOutcome, ToolAttemptLaunch, ToolBatchEffectOutcome, ToolCallLaunch,
106};
107pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
108pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
109pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
110pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
111use io::normalize_input_items;
112pub use observation::{
113    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
114    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
115    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
116    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
117    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
118    SessionRevision,
119};
120#[cfg(any(test, feature = "testing"))]
121pub use process::TestLocalProcessRegistry;
122pub use process::{
123    DefaultProcessCancelAbility, InMemoryProcessExecutionEnvStore, ObservedProcess,
124    ObservedProcessEvent, ObservedWorkItem, PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput,
125    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
126    ProcessCancelSummary, ProcessEngine, ProcessEngineRegistry, ProcessEngineRunContext,
127    ProcessEngineRunGuard, ProcessEngineRuntimeContext, ProcessEngineValidationContext,
128    ProcessEvent, ProcessEventAppendPlan, ProcessEventAppendRequest, ProcessEventAppendResult,
129    ProcessEventSemantics, ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext,
130    ProcessExecutionEnvRef, ProcessExecutionEnvSpec, ProcessExecutionEnvStore, ProcessExternalRef,
131    ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry, ProcessHandleSummary,
132    ProcessId, ProcessIdentity, ProcessInput, ProcessLease, ProcessLeaseCompletion,
133    ProcessLifecycleStatus, ProcessListFilter, ProcessListMode, ProcessOpScope, ProcessOriginator,
134    ProcessProvenance, ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessService,
135    ProcessSessionDeleteReport, ProcessSpawnProvenance, ProcessStartGrant, ProcessStartOptions,
136    ProcessStartRequest, ProcessStatus, ProcessStatusFilter, ProcessTerminalSemantics,
137    ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector, ProcessWake,
138    ProcessWakeDedupeKey, ProcessWakeDelivery, ProcessWakeSpec, ProcessWorkObserver,
139    ProcessWorkSnapshot, SessionScope, SessionScopeId, UnavailableProcessService, WaitKind,
140    WaitState, apply_process_status_projection, current_epoch_ms, epoch_ms_from_system_time,
141    load_process_execution_env, materialize_process_event_semantics, persist_process_execution_env,
142    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
143    process_signal_event_type, process_signal_name_from_event_type, process_signal_wait_key,
144    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
145    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
146    validate_process_signal_name,
147};
148pub use process_work_driver::{InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver};
149pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
150pub use queued_work_driver::{QueuedWorkDriver, QueuedWorkRunHandle, QueuedWorkRunRequest};
151pub use session_manager::DirectCompletionClient;
152pub use state::RuntimeSessionState;
153use state::{
154    append_session_nodes_to_state_with_clock, apply_residency_on_load, apply_session_checkpoint,
155    apply_session_head, normalize_session_graph, open_agent_frame_in_state_with_clock,
156};
157pub use turn_loop::ensure_durable_effect_input;
158pub use turn_queue::{
159    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
160    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
161    QueuedWorkClass, QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand,
162    SessionCommandReceipt, SlotPolicy, process_wake_batch_draft,
163};
164pub use usage::{
165    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
166    diff_usage_reports,
167};
168use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
169
170#[doc(hidden)]
171#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
172pub enum RuntimeTurnPhase {
173    ContextTransform,
174    BeforeTurnHooks,
175    PromptBuild,
176    EffectLoop,
177    FinalizeTurn,
178    PersistTurn,
179    FinalCommit,
180    PostPersistHooks,
181}
182
183#[doc(hidden)]
184pub trait RuntimeTurnPhaseProbe: Send + Sync {
185    fn begin(&self, phase: RuntimeTurnPhase);
186    fn end(&self, phase: RuntimeTurnPhase);
187    fn begin_named(&self, _phase: &str) {}
188    fn end_named(&self, _phase: &str) {}
189}
190
191#[doc(hidden)]
192#[derive(Clone, Default)]
193pub struct RuntimeTurnPhaseProbeSlot {
194    probes: Arc<StdMutex<HashMap<crate::SessionScopeId, Arc<dyn RuntimeTurnPhaseProbe>>>>,
195}
196
197impl RuntimeTurnPhaseProbeSlot {
198    pub fn set_for_session(
199        &self,
200        session_id: impl Into<String>,
201        probe: Arc<dyn RuntimeTurnPhaseProbe>,
202    ) {
203        self.set_for_scope(&crate::SessionScope::new(session_id), probe);
204    }
205
206    pub fn set_for_scope(
207        &self,
208        scope: &crate::SessionScope,
209        probe: Arc<dyn RuntimeTurnPhaseProbe>,
210    ) {
211        self.probes
212            .lock()
213            .expect("runtime phase probe slot")
214            .insert(scope.id(), probe);
215    }
216
217    pub fn get_for_scope(
218        &self,
219        scope: &crate::SessionScope,
220    ) -> Option<Arc<dyn RuntimeTurnPhaseProbe>> {
221        let probes = self.probes.lock().expect("runtime phase probe slot");
222        probes.get(&scope.id()).cloned().or_else(|| {
223            probes
224                .get(&crate::SessionScope::new(&scope.session_id).id())
225                .cloned()
226        })
227    }
228}
229
230#[doc(hidden)]
231pub struct RuntimeNamedPhase {
232    probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
233    phase: &'static str,
234}
235
236impl RuntimeNamedPhase {
237    pub fn begin(
238        probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
239        phase: &'static str,
240    ) -> RuntimeNamedPhase {
241        if let Some(probe) = probe.as_ref() {
242            probe.begin_named(phase);
243        }
244        RuntimeNamedPhase { probe, phase }
245    }
246}
247
248impl Drop for RuntimeNamedPhase {
249    fn drop(&mut self) {
250        if let Some(probe) = self.probe.as_ref() {
251            probe.end_named(self.phase);
252        }
253    }
254}
255
256/// Host-provided per-turn input.
257#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
258#[serde(tag = "type", rename_all = "snake_case")]
259pub enum InputItem {
260    Text { text: String },
261    ImageRef { id: String },
262}
263
264impl InputItem {
265    pub fn text(text: impl Into<String>) -> Self {
266        Self::Text { text: text.into() }
267    }
268
269    pub fn image_ref(id: impl Into<String>) -> Self {
270        Self::ImageRef { id: id.into() }
271    }
272}
273
274/// Host-provided per-turn input.
275#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
276pub struct TurnInput {
277    pub items: Vec<InputItem>,
278    #[serde(default)]
279    pub image_blobs: HashMap<String, Vec<u8>>,
280    /// Per-turn override for protocol-owned turn options.
281    #[serde(default, skip_serializing_if = "Option::is_none")]
282    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
283    /// Optional externally-stable trace turn id. Normal runtime callers leave
284    /// this empty and the runtime generates one per outer turn.
285    #[serde(default, skip_serializing_if = "Option::is_none")]
286    pub trace_turn_id: Option<String>,
287    #[serde(skip)]
288    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
289    #[serde(skip)]
290    pub turn_context: TurnContext,
291}
292
293impl TurnInput {
294    pub fn empty() -> Self {
295        Self::items(std::iter::empty())
296    }
297
298    pub fn text(text: impl Into<String>) -> Self {
299        Self::items([InputItem::text(text)])
300    }
301
302    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
303        Self {
304            items: items.into_iter().collect(),
305            image_blobs: HashMap::new(),
306            protocol_turn_options: None,
307            trace_turn_id: None,
308            protocol_extension: None,
309            turn_context: TurnContext::default(),
310        }
311    }
312
313    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
314        self.image_blobs.insert(id.into(), bytes);
315        self
316    }
317
318    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
319    where
320        I: IntoIterator<Item = (K, Vec<u8>)>,
321        K: Into<String>,
322    {
323        self.image_blobs.extend(
324            image_blobs
325                .into_iter()
326                .map(|(id, bytes)| (id.into(), bytes)),
327        );
328        self
329    }
330
331    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
332        let id = id.into();
333        self.items.push(InputItem::image_ref(id.clone()));
334        self.image_blobs.insert(id, bytes);
335        self
336    }
337
338    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
339        self.protocol_turn_options = Some(options);
340        self
341    }
342
343    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
344        self.trace_turn_id = Some(trace_turn_id.into());
345        self
346    }
347}
348
349/// Per-turn, in-process side channel of typed plugin inputs.
350///
351/// This is an `Any`-keyed map of live Rust values handed to plugins for a
352/// single turn. It is deliberately **not** serializable: the values never
353/// survive a process boundary, so durable effect-host runs explicitly reject a
354/// turn that carries any live inputs (see
355/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
356/// encode replayable data in
357/// `protocol_turn_options` or persisted plugin state.
358#[derive(Clone, Default)]
359pub struct LiveTurnInputs {
360    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
361}
362
363impl LiveTurnInputs {
364    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
365    where
366        T: Send + Sync + 'static,
367    {
368        self.inputs.insert(plugin_id, Arc::new(input));
369    }
370
371    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
372    where
373        T: 'static,
374    {
375        self.inputs
376            .get(plugin_id)
377            .and_then(|input| input.downcast_ref::<T>())
378    }
379
380    fn contains(&self, plugin_id: &'static str) -> bool {
381        self.inputs.contains_key(plugin_id)
382    }
383
384    fn is_empty(&self) -> bool {
385        self.inputs.is_empty()
386    }
387
388    pub fn plugin_ids(&self) -> Vec<&'static str> {
389        self.inputs.keys().copied().collect()
390    }
391
392    /// Returns an error when live per-turn inputs would make a durable effect
393    /// host replay depend on process-local values.
394    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
395        if self.is_empty() {
396            return Ok(());
397        }
398        Err(RuntimeError::new(
399            RuntimeErrorCode::DurableEffectLivePluginInput,
400            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
401        ))
402    }
403}
404
405#[derive(Clone, Default)]
406pub struct TurnContext {
407    plugin_inputs: LiveTurnInputs,
408    provider: Option<crate::ProviderHandle>,
409    model: Option<crate::ModelSpec>,
410    prompt: crate::PromptLayer,
411}
412
413impl TurnContext {
414    pub fn new() -> Self {
415        Self::default()
416    }
417
418    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
419    where
420        T: Send + Sync + 'static,
421    {
422        self.plugin_inputs.insert(plugin_id, input);
423    }
424
425    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
426        self.provider = Some(provider);
427    }
428
429    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
430        self.provider.as_ref()
431    }
432
433    pub fn set_model(&mut self, model: crate::ModelSpec) {
434        self.model = Some(model);
435    }
436
437    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
438        self.model.as_ref()
439    }
440
441    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
442    where
443        T: 'static,
444    {
445        self.plugin_inputs.get(plugin_id)
446    }
447
448    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
449        self.plugin_inputs.contains(plugin_id)
450    }
451
452    pub fn has_live_plugin_inputs(&self) -> bool {
453        !self.plugin_inputs.is_empty()
454    }
455
456    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
457        self.plugin_inputs.plugin_ids()
458    }
459
460    /// Live plugin inputs for this turn. The durable boundary inspects this to
461    /// reject turns carrying non-serializable live state.
462    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
463        &self.plugin_inputs
464    }
465
466    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
467        self.prompt.template = Some(template);
468    }
469
470    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
471        self.prompt.add_contribution(contribution);
472    }
473
474    pub fn replace_prompt_slot(
475        &mut self,
476        slot: crate::PromptSlot,
477        contributions: impl IntoIterator<Item = crate::PromptContribution>,
478    ) {
479        self.prompt.replace_slot(slot, contributions);
480    }
481
482    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
483        self.prompt.clear_slot(slot);
484    }
485
486    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
487        self.prompt = prompt;
488    }
489
490    pub fn prompt_layer(&self) -> &crate::PromptLayer {
491        &self.prompt
492    }
493}
494
495impl fmt::Debug for TurnContext {
496    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497        f.debug_struct("TurnContext")
498            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
499            .field("has_provider", &self.provider.is_some())
500            .field("has_model", &self.model.is_some())
501            .field("has_prompt_layer", &(!self.prompt.is_empty()))
502            .finish()
503    }
504}
505
506#[derive(Clone)]
507pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
508
509impl ProtocolTurnExtensionHandle {
510    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
511        Self(Arc::new(extension))
512    }
513
514    pub fn as_any(&self) -> &dyn Any {
515        self.0.as_any()
516    }
517
518    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
519        self.0.prompt_contributions()
520    }
521}
522
523impl fmt::Debug for ProtocolTurnExtensionHandle {
524    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525        f.write_str("ProtocolTurnExtensionHandle(..)")
526    }
527}
528
529pub trait ProtocolTurnExtension: Send + Sync {
530    fn as_any(&self) -> &dyn Any;
531
532    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
533        Vec::new()
534    }
535}
536
537#[derive(Clone)]
538pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
539
540impl ProtocolSessionExtensionHandle {
541    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
542        Self(Arc::new(extension))
543    }
544
545    pub fn as_any(&self) -> &dyn Any {
546        self.0.as_any()
547    }
548}
549
550impl fmt::Debug for ProtocolSessionExtensionHandle {
551    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552        f.write_str("ProtocolSessionExtensionHandle(..)")
553    }
554}
555
556pub trait ProtocolSessionExtension: Send + Sync {
557    fn as_any(&self) -> &dyn Any;
558}
559
560#[derive(Clone, Debug)]
561pub(super) enum NormalizedItem {
562    Text(String),
563    Image(crate::AttachmentRef),
564}
565
566/// Canonical assistant output payload.
567#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
568pub struct AssistantOutput {
569    pub safe_text: String,
570    pub raw_text: String,
571    pub state: OutputState,
572}
573
574/// Quality and usability of assembled terminal output.
575#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
576#[serde(rename_all = "snake_case")]
577pub enum OutputState {
578    Usable,
579    EmptyOutput,
580    TracebackOnly,
581    RecoveredFromError,
582}
583
584/// Code execution output observed during a turn.
585#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
586pub struct CodeOutputRecord {
587    pub output: String,
588    #[serde(default, skip_serializing_if = "Option::is_none")]
589    pub error: Option<String>,
590}
591
592/// High-level execution summary for a completed turn.
593#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
594pub struct ExecutionSummary {
595    #[serde(default)]
596    pub had_tool_calls: bool,
597    #[serde(default)]
598    pub had_code_execution: bool,
599}
600
601/// Structured issue surfaced during turn execution.
602#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
603pub struct TurnIssue {
604    pub kind: String,
605    #[serde(default, skip_serializing_if = "Option::is_none")]
606    pub code: Option<String>,
607    #[serde(default, skip_serializing_if = "Option::is_none")]
608    pub terminal_reason: Option<crate::LlmTerminalReason>,
609    pub message: String,
610    #[serde(default, skip_serializing_if = "Option::is_none")]
611    pub raw: Option<String>,
612}
613
614/// Canonical high-level turn result returned to hosts.
615#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
616pub struct AssembledTurn {
617    pub state: SessionSnapshot,
618    pub outcome: crate::TurnOutcome,
619    pub assistant_output: AssistantOutput,
620    pub execution: ExecutionSummary,
621    #[serde(default)]
622    pub token_usage: TokenUsage,
623    /// Per-(session, source, model) ledger entries for child sessions whose
624    /// LLM calls completed during this turn. `token_usage` above is the
625    /// parent's own LLM tokens; `total_usage` (on the embed-facing
626    /// `TurnResult`) sums both.
627    #[serde(default)]
628    pub children_usage: Vec<TokenLedgerEntry>,
629    #[serde(default)]
630    pub tool_calls: Vec<ToolCallRecord>,
631    #[serde(default)]
632    pub errors: Vec<TurnIssue>,
633}
634
635/// Result of driving one logical host turn through any AgentFrame switches.
636///
637/// A frame switch is an internal runtime continuation, similar to compaction
638/// from a host's perspective. Callers that need a final answer can use
639/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
640#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
641pub struct AgentFrameRun {
642    pub turns: Vec<AssembledTurn>,
643}
644
645impl AgentFrameRun {
646    pub fn final_turn(&self) -> Option<&AssembledTurn> {
647        self.turns.last()
648    }
649
650    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
651        self.turns.pop()
652    }
653
654    pub fn frame_switch_count(&self) -> usize {
655        self.turns
656            .iter()
657            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
658            .count()
659    }
660}
661
662/// Termination policy knobs.
663#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
664pub struct TerminationPolicy {
665    #[serde(default)]
666    pub treat_missing_done_as_failure: bool,
667}
668
669impl Default for TerminationPolicy {
670    fn default() -> Self {
671        Self {
672            treat_missing_done_as_failure: true,
673        }
674    }
675}
676
677/// Host application sink for low-level streaming runtime events.
678/// `SessionEvent` is protocol-specific preview/progress data.
679#[async_trait::async_trait]
680pub trait EventSink: Send + Sync {
681    fn is_noop(&self) -> bool {
682        false
683    }
684
685    async fn emit(&self, event: SessionEvent);
686}
687
688/// No-op sink useful for callers that only care about final state.
689pub struct NoopEventSink;
690
691/// Static no-op event sink for callers that need a `&dyn EventSink` default.
692pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
693
694#[async_trait::async_trait]
695impl EventSink for NoopEventSink {
696    fn is_noop(&self) -> bool {
697        true
698    }
699
700    async fn emit(&self, _event: SessionEvent) {}
701}
702
703/// Stable identifier for a semantic turn activity.
704#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
705#[serde(transparent)]
706pub struct TurnActivityId(pub String);
707
708impl TurnActivityId {
709    pub fn new(id: impl Into<String>) -> Self {
710        Self(id.into())
711    }
712
713    pub fn fresh() -> Self {
714        Self(uuid::Uuid::new_v4().to_string())
715    }
716}
717
718/// App-facing semantic activity emitted during a turn.
719///
720/// `id` is unique per emitted activity event. `correlation_id` groups related
721/// events in the same logical activity, such as code start/completion, tool
722/// start/completion, or text deltas from one output block.
723#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
724pub struct TurnActivity {
725    pub id: TurnActivityId,
726    pub correlation_id: TurnActivityId,
727    #[serde(flatten)]
728    pub event: TurnEvent,
729}
730
731impl TurnActivity {
732    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
733        Self {
734            id: TurnActivityId::fresh(),
735            correlation_id,
736            event,
737        }
738    }
739
740    pub fn independent(event: TurnEvent) -> Self {
741        let correlation_id = TurnActivityId::fresh();
742        Self::new(correlation_id, event)
743    }
744}
745
746/// App-facing semantic event payload for a turn activity.
747///
748/// Unlike [`SessionEvent`], these events are stable application signals rather
749/// than low-level runtime/debug events. Public streams carry these payloads
750/// inside [`TurnActivity`] so every emitted item has identity.
751#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
752#[serde(tag = "type", rename_all = "snake_case")]
753#[allow(clippy::large_enum_variant)]
754pub enum TurnEvent {
755    QueuedWorkStarted {
756        boundary: crate::QueuedWorkClaimBoundary,
757        batch_ids: Vec<String>,
758        causes: Vec<crate::TurnCause>,
759    },
760    ModelRequestStarted {
761        protocol_iteration: usize,
762    },
763    AssistantProseDelta {
764        text: String,
765    },
766    ReasoningDelta {
767        text: String,
768    },
769    CodeBlockStarted {
770        language: String,
771        code: String,
772        #[serde(default, skip_serializing_if = "Option::is_none")]
773        graph_key: Option<String>,
774    },
775    CodeBlockCompleted {
776        language: String,
777        output: String,
778        error: Option<String>,
779        success: bool,
780        duration_ms: u64,
781        tool_call_ids: Vec<String>,
782        #[serde(default, skip_serializing_if = "Option::is_none")]
783        graph_key: Option<String>,
784    },
785    ToolCallStarted {
786        call_id: Option<String>,
787        name: String,
788        args: serde_json::Value,
789    },
790    ToolCallCompleted {
791        call_id: Option<String>,
792        name: String,
793        args: serde_json::Value,
794        output: crate::ToolCallOutput,
795        duration_ms: u64,
796    },
797    SubmittedValue {
798        value: serde_json::Value,
799    },
800    ToolValue {
801        tool_name: String,
802        value: serde_json::Value,
803    },
804    Usage {
805        protocol_iteration: usize,
806        usage: TokenUsage,
807        cumulative: TokenUsage,
808    },
809    ChildUsage {
810        session_id: String,
811        source: String,
812        model: String,
813        protocol_iteration: usize,
814        usage: TokenUsage,
815        cumulative: TokenUsage,
816    },
817    RetryStatus {
818        wait_seconds: u64,
819        attempt: usize,
820        max_attempts: usize,
821        reason: String,
822    },
823    PluginRuntime {
824        plugin_id: String,
825        event: crate::PluginRuntimeEvent,
826    },
827    QueuedInputAccepted {
828        checkpoint: crate::CheckpointKind,
829        inputs: Vec<crate::AcceptedInjectedTurnInput>,
830    },
831    QueuedMessagesCommitted {
832        messages: Vec<crate::PluginMessage>,
833        checkpoint: crate::CheckpointKind,
834    },
835    Error {
836        message: String,
837    },
838}
839
840#[async_trait::async_trait]
841pub trait TurnActivitySink: Send + Sync {
842    fn is_noop(&self) -> bool {
843        false
844    }
845
846    async fn emit(&self, activity: TurnActivity);
847}
848
849pub struct NoopTurnActivitySink;
850
851/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
852pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
853
854#[async_trait::async_trait]
855impl TurnActivitySink for NoopTurnActivitySink {
856    fn is_noop(&self) -> bool {
857        true
858    }
859
860    async fn emit(&self, _activity: TurnActivity) {}
861}
862
863/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
864/// turn-driving entry points (`stream_turn`,
865/// `stream_turn_with_agent_frames`).
866///
867/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
868/// default to no-op sinks. Execution scope is explicit and required at every
869/// runtime boundary that can execute nondeterministic work.
870pub struct TurnOptions<'a> {
871    events: Option<&'a dyn EventSink>,
872    turn_events: Option<&'a dyn TurnActivitySink>,
873    scoped_effect_controller: ScopedEffectController<'a>,
874    cancel: CancellationToken,
875}
876
877impl<'a> TurnOptions<'a> {
878    pub fn new(
879        cancel: CancellationToken,
880        scoped_effect_controller: ScopedEffectController<'a>,
881    ) -> Self {
882        Self {
883            events: None,
884            turn_events: None,
885            scoped_effect_controller,
886            cancel,
887        }
888    }
889
890    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
891        self.events = Some(events);
892        self
893    }
894
895    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
896        self.turn_events = Some(turn_events);
897        self
898    }
899
900    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
901        self.events.unwrap_or(&NOOP_EVENT_SINK)
902    }
903
904    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
905        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
906    }
907
908    pub(crate) fn execution_scope_id(&self) -> &str {
909        self.scoped_effect_controller.scope_id()
910    }
911
912    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
913        self.scoped_effect_controller.clone()
914    }
915}
916
917enum RuntimeStreamEvent {
918    Session(SessionEvent),
919    Turn(TurnActivity),
920}
921
922#[derive(Clone)]
923pub struct SessionStoreCreateRequest {
924    pub session_id: String,
925    pub relation: SessionRelation,
926    pub policy: SessionPolicy,
927}
928
929impl SessionStoreCreateRequest {
930    pub fn parent_session_id(&self) -> Option<&str> {
931        self.relation.parent_session_id()
932    }
933}
934
935#[async_trait::async_trait]
936pub trait SessionStoreFactory: Send + Sync {
937    /// Durability tier the stores produced by this factory provide; defaults to
938    /// [`DurabilityTier::Inline`].
939    fn durability_tier(&self) -> crate::DurabilityTier {
940        crate::DurabilityTier::Inline
941    }
942
943    async fn create_store(
944        &self,
945        request: &SessionStoreCreateRequest,
946    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
947
948    async fn open_existing_store(
949        &self,
950        _request: &SessionStoreCreateRequest,
951    ) -> Result<Option<Arc<dyn crate::store::RuntimePersistence>>, String> {
952        Ok(None)
953    }
954
955    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
956}
957
958/// Generic runtime for CLI or programmatic embedding.
959pub struct LashRuntime {
960    pub(in crate::runtime) session: Option<Session>,
961    pub(in crate::runtime) policy: SessionPolicy,
962    pub(in crate::runtime) host: RuntimeHost,
963    pub(in crate::runtime) services: RuntimeServices,
964    pub(in crate::runtime) state: RuntimeSessionState,
965    pub(in crate::runtime) runtime_scope_id: Arc<str>,
966    pub(in crate::runtime) runtime_lease_owner: crate::LeaseOwnerIdentity,
967    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
968    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
969    /// Protocol-owned turn options for this session.
970    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
971    /// Session-scoped token cost ledger. Shared by ALL
972    /// `RuntimeSessionServices` instances created from this runtime
973    /// (both per-turn and async maintenance). Entries accumulate here
974    /// and are drained into `state.token_ledger` at turn-commit time.
975    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
976    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
977    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
978    /// Resident-graph policy chosen by the host. Controls whether
979    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
980    /// graph or just the active path, matching the trimming behavior set at
981    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
982    pub(in crate::runtime) residency: Residency,
983}