Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod clock;
5mod config_ops;
6mod effect;
7mod environment;
8mod error;
9mod host;
10mod in_memory_store;
11mod io;
12mod lifecycle;
13mod observation;
14mod process;
15mod process_work_driver;
16mod process_worker;
17mod queued_work_driver;
18mod session_api;
19mod session_execution_lease;
20mod session_manager;
21mod session_ops;
22mod state;
23#[cfg(test)]
24pub(crate) mod tests;
25mod turn_boundary;
26mod turn_commit_draft;
27mod turn_driver;
28mod turn_graph_editor;
29mod turn_loop;
30mod turn_queue;
31mod usage;
32
33use std::any::Any;
34use std::collections::HashMap;
35use std::fmt;
36use std::sync::Arc;
37use std::sync::Mutex as StdMutex;
38use std::sync::atomic::{AtomicBool, Ordering};
39
40use tokio::sync::{Mutex, mpsc};
41use tokio_util::sync::CancellationToken;
42
43use crate::llm::types::{
44    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
45    LlmStreamEvent, LlmUsage,
46};
47use crate::plugin::{
48    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
49};
50use crate::sansio::{LlmCallError, Response};
51use crate::session_model::{
52    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
53    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
54    transport_stream_events,
55};
56use crate::{
57    CheckpointKind, PersistentRuntimeServices, PluginOperationInvokeError, PromptHookContext,
58    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
59    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
60};
61use crate::{Effect, TurnMachine};
62
63use host::*;
64use session_execution_lease::*;
65use session_manager::*;
66use turn_boundary::*;
67use turn_commit_draft::*;
68use turn_driver::*;
69
70pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 15 * 60 * 1000;
71
72pub(super) fn runtime_error_from_store_commit(err: crate::store::StoreError) -> RuntimeError {
73    match err {
74        crate::store::StoreError::SessionExecutionLeaseExpired { session_id } => RuntimeError::new(
75            RuntimeErrorCode::SessionExecutionLeaseLost,
76            format!("session execution lease for session `{session_id}` was lost before commit"),
77        ),
78        err => RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()),
79    }
80}
81
82// `PromptUsage` is re-exported below alongside the runtime's own types.
83pub use lash_sansio::PromptUsage;
84
85use assembly::{
86    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
87    LlmStreamState, LlmStreamSummary, TurnAssembler,
88};
89#[cfg(test)]
90#[allow(unused_imports)]
91use assembly::{classify_output_state, sanitize_assistant_output};
92pub use builder::EmbeddedRuntimeBuilder;
93pub use causal::process_event_invocation;
94pub(crate) use causal::tool_retry_sleep_invocation;
95pub use clock::{Clock, SystemClock};
96pub(crate) use effect::RuntimeEffectControllerHandle;
97pub use effect::{
98    AwaitEventKey, AwaitEventWaitIdentity, CausalRef, EffectHost, ExecutionScope,
99    ExternalCompletionError, InlineEffectHost, InlineRuntimeEffectController, LlmAttachmentSpec,
100    LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, Resolution, ResolveOutcome,
101    RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
102    RuntimeEffectEnvelope, RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome,
103    RuntimeInvocation, RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
104    ToolAttemptEffectOutcome, ToolAttemptLaunch, ToolBatchEffectOutcome, ToolCallLaunch,
105};
106pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
107pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
108pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
109pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
110use io::normalize_input_items;
111pub use observation::{
112    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
113    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
114    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
115    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
116    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
117    SessionRevision,
118};
119#[cfg(any(test, feature = "testing"))]
120pub use process::TestLocalProcessRegistry;
121pub use process::{
122    DefaultProcessCancelAbility, InMemoryProcessExecutionEnvStore, ObservedProcess,
123    ObservedProcessEvent, ObservedWorkItem, PROCESS_LEASE_SCHEMA_VERSION, ProcessAwaitOutput,
124    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
125    ProcessCancelSummary, ProcessEngine, ProcessEngineRegistry, ProcessEngineRunContext,
126    ProcessEngineRunGuard, ProcessEngineRuntimeContext, ProcessEngineValidationContext,
127    ProcessEvent, ProcessEventAppendPlan, ProcessEventAppendRequest, ProcessEventAppendResult,
128    ProcessEventSemantics, ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext,
129    ProcessExecutionEnvRef, ProcessExecutionEnvSpec, ProcessExecutionEnvStore, ProcessExternalRef,
130    ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry, ProcessHandleSummary,
131    ProcessId, ProcessIdentity, ProcessInput, ProcessLease, ProcessLeaseCompletion,
132    ProcessLifecycleStatus, ProcessListFilter, ProcessListMode, ProcessOpScope, ProcessOriginator,
133    ProcessProvenance, ProcessRecord, ProcessRegistration, ProcessRegistry, ProcessService,
134    ProcessSessionDeleteReport, ProcessSpawnProvenance, ProcessStartGrant, ProcessStartOptions,
135    ProcessStartRequest, ProcessStatus, ProcessStatusFilter, ProcessTerminalSemantics,
136    ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector, ProcessWake,
137    ProcessWakeDedupeKey, ProcessWakeDelivery, ProcessWakeSpec, ProcessWorkObserver,
138    ProcessWorkSnapshot, SessionScope, SessionScopeId, UnavailableProcessService, WaitKind,
139    WaitState, apply_process_status_projection, current_epoch_ms, epoch_ms_from_system_time,
140    load_process_execution_env, materialize_process_event_semantics, persist_process_execution_env,
141    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
142    process_signal_event_type, process_signal_name_from_event_type, process_signal_wait_key,
143    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
144    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
145    validate_process_signal_name,
146};
147pub use process_work_driver::{InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver};
148pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
149pub use queued_work_driver::{QueuedWorkDriver, QueuedWorkRunHandle, QueuedWorkRunRequest};
150pub use session_manager::DirectCompletionClient;
151pub use state::RuntimeSessionState;
152use state::{
153    append_session_nodes_to_state_with_clock, apply_residency_on_load, apply_session_checkpoint,
154    apply_session_head, normalize_session_graph, open_agent_frame_in_state_with_clock,
155};
156pub use turn_loop::ensure_durable_effect_input;
157pub use turn_queue::{
158    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
159    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
160    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand, SessionCommandReceipt,
161    SlotPolicy, process_wake_batch_draft,
162};
163pub use usage::{
164    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
165    diff_usage_reports,
166};
167use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
168
169#[doc(hidden)]
170#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
171pub enum RuntimeTurnPhase {
172    ContextTransform,
173    BeforeTurnHooks,
174    PromptBuild,
175    EffectLoop,
176    FinalizeTurn,
177    PersistTurn,
178    FinalCommit,
179    PostPersistHooks,
180}
181
182#[doc(hidden)]
183pub trait RuntimeTurnPhaseProbe: Send + Sync {
184    fn begin(&self, phase: RuntimeTurnPhase);
185    fn end(&self, phase: RuntimeTurnPhase);
186    fn begin_named(&self, _phase: &str) {}
187    fn end_named(&self, _phase: &str) {}
188}
189
190#[doc(hidden)]
191#[derive(Clone, Default)]
192pub struct RuntimeTurnPhaseProbeSlot {
193    probes: Arc<StdMutex<HashMap<crate::SessionScopeId, Arc<dyn RuntimeTurnPhaseProbe>>>>,
194}
195
196impl RuntimeTurnPhaseProbeSlot {
197    pub fn set_for_session(
198        &self,
199        session_id: impl Into<String>,
200        probe: Arc<dyn RuntimeTurnPhaseProbe>,
201    ) {
202        self.set_for_scope(&crate::SessionScope::new(session_id), probe);
203    }
204
205    pub fn set_for_scope(
206        &self,
207        scope: &crate::SessionScope,
208        probe: Arc<dyn RuntimeTurnPhaseProbe>,
209    ) {
210        self.probes
211            .lock()
212            .expect("runtime phase probe slot")
213            .insert(scope.id(), probe);
214    }
215
216    pub fn get_for_scope(
217        &self,
218        scope: &crate::SessionScope,
219    ) -> Option<Arc<dyn RuntimeTurnPhaseProbe>> {
220        let probes = self.probes.lock().expect("runtime phase probe slot");
221        probes.get(&scope.id()).cloned().or_else(|| {
222            probes
223                .get(&crate::SessionScope::new(&scope.session_id).id())
224                .cloned()
225        })
226    }
227}
228
229#[doc(hidden)]
230pub struct RuntimeNamedPhase {
231    probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
232    phase: &'static str,
233}
234
235impl RuntimeNamedPhase {
236    pub fn begin(
237        probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
238        phase: &'static str,
239    ) -> RuntimeNamedPhase {
240        if let Some(probe) = probe.as_ref() {
241            probe.begin_named(phase);
242        }
243        RuntimeNamedPhase { probe, phase }
244    }
245}
246
247impl Drop for RuntimeNamedPhase {
248    fn drop(&mut self) {
249        if let Some(probe) = self.probe.as_ref() {
250            probe.end_named(self.phase);
251        }
252    }
253}
254
255/// Host-provided per-turn input.
256#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
257#[serde(tag = "type", rename_all = "snake_case")]
258pub enum InputItem {
259    Text { text: String },
260    ImageRef { id: String },
261}
262
263impl InputItem {
264    pub fn text(text: impl Into<String>) -> Self {
265        Self::Text { text: text.into() }
266    }
267
268    pub fn image_ref(id: impl Into<String>) -> Self {
269        Self::ImageRef { id: id.into() }
270    }
271}
272
273/// Host-provided per-turn input.
274#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
275pub struct TurnInput {
276    pub items: Vec<InputItem>,
277    #[serde(default)]
278    pub image_blobs: HashMap<String, Vec<u8>>,
279    /// Per-turn override for protocol-owned turn options.
280    #[serde(default, skip_serializing_if = "Option::is_none")]
281    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
282    /// Optional externally-stable trace turn id. Normal runtime callers leave
283    /// this empty and the runtime generates one per outer turn.
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    pub trace_turn_id: Option<String>,
286    #[serde(skip)]
287    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
288    #[serde(skip)]
289    pub turn_context: TurnContext,
290}
291
292impl TurnInput {
293    pub fn empty() -> Self {
294        Self::items(std::iter::empty())
295    }
296
297    pub fn text(text: impl Into<String>) -> Self {
298        Self::items([InputItem::text(text)])
299    }
300
301    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
302        Self {
303            items: items.into_iter().collect(),
304            image_blobs: HashMap::new(),
305            protocol_turn_options: None,
306            trace_turn_id: None,
307            protocol_extension: None,
308            turn_context: TurnContext::default(),
309        }
310    }
311
312    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
313        self.image_blobs.insert(id.into(), bytes);
314        self
315    }
316
317    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
318    where
319        I: IntoIterator<Item = (K, Vec<u8>)>,
320        K: Into<String>,
321    {
322        self.image_blobs.extend(
323            image_blobs
324                .into_iter()
325                .map(|(id, bytes)| (id.into(), bytes)),
326        );
327        self
328    }
329
330    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
331        let id = id.into();
332        self.items.push(InputItem::image_ref(id.clone()));
333        self.image_blobs.insert(id, bytes);
334        self
335    }
336
337    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
338        self.protocol_turn_options = Some(options);
339        self
340    }
341
342    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
343        self.trace_turn_id = Some(trace_turn_id.into());
344        self
345    }
346}
347
348/// Per-turn, in-process side channel of typed plugin inputs.
349///
350/// This is an `Any`-keyed map of live Rust values handed to plugins for a
351/// single turn. It is deliberately **not** serializable: the values never
352/// survive a process boundary, so durable effect-host runs explicitly reject a
353/// turn that carries any live inputs (see
354/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
355/// encode replayable data in
356/// `protocol_turn_options` or persisted plugin state.
357#[derive(Clone, Default)]
358pub struct LiveTurnInputs {
359    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
360}
361
362impl LiveTurnInputs {
363    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
364    where
365        T: Send + Sync + 'static,
366    {
367        self.inputs.insert(plugin_id, Arc::new(input));
368    }
369
370    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
371    where
372        T: 'static,
373    {
374        self.inputs
375            .get(plugin_id)
376            .and_then(|input| input.downcast_ref::<T>())
377    }
378
379    fn contains(&self, plugin_id: &'static str) -> bool {
380        self.inputs.contains_key(plugin_id)
381    }
382
383    fn is_empty(&self) -> bool {
384        self.inputs.is_empty()
385    }
386
387    pub fn plugin_ids(&self) -> Vec<&'static str> {
388        self.inputs.keys().copied().collect()
389    }
390
391    /// Returns an error when live per-turn inputs would make a durable effect
392    /// host replay depend on process-local values.
393    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
394        if self.is_empty() {
395            return Ok(());
396        }
397        Err(RuntimeError::new(
398            RuntimeErrorCode::DurableEffectLivePluginInput,
399            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
400        ))
401    }
402}
403
404#[derive(Clone, Default)]
405pub struct TurnContext {
406    plugin_inputs: LiveTurnInputs,
407    provider: Option<crate::ProviderHandle>,
408    model: Option<crate::ModelSpec>,
409    prompt: crate::PromptLayer,
410}
411
412impl TurnContext {
413    pub fn new() -> Self {
414        Self::default()
415    }
416
417    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
418    where
419        T: Send + Sync + 'static,
420    {
421        self.plugin_inputs.insert(plugin_id, input);
422    }
423
424    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
425        self.provider = Some(provider);
426    }
427
428    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
429        self.provider.as_ref()
430    }
431
432    pub fn set_model(&mut self, model: crate::ModelSpec) {
433        self.model = Some(model);
434    }
435
436    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
437        self.model.as_ref()
438    }
439
440    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
441    where
442        T: 'static,
443    {
444        self.plugin_inputs.get(plugin_id)
445    }
446
447    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
448        self.plugin_inputs.contains(plugin_id)
449    }
450
451    pub fn has_live_plugin_inputs(&self) -> bool {
452        !self.plugin_inputs.is_empty()
453    }
454
455    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
456        self.plugin_inputs.plugin_ids()
457    }
458
459    /// Live plugin inputs for this turn. The durable boundary inspects this to
460    /// reject turns carrying non-serializable live state.
461    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
462        &self.plugin_inputs
463    }
464
465    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
466        self.prompt.template = Some(template);
467    }
468
469    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
470        self.prompt.add_contribution(contribution);
471    }
472
473    pub fn replace_prompt_slot(
474        &mut self,
475        slot: crate::PromptSlot,
476        contributions: impl IntoIterator<Item = crate::PromptContribution>,
477    ) {
478        self.prompt.replace_slot(slot, contributions);
479    }
480
481    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
482        self.prompt.clear_slot(slot);
483    }
484
485    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
486        self.prompt = prompt;
487    }
488
489    pub fn prompt_layer(&self) -> &crate::PromptLayer {
490        &self.prompt
491    }
492}
493
494impl fmt::Debug for TurnContext {
495    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496        f.debug_struct("TurnContext")
497            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
498            .field("has_provider", &self.provider.is_some())
499            .field("has_model", &self.model.is_some())
500            .field("has_prompt_layer", &(!self.prompt.is_empty()))
501            .finish()
502    }
503}
504
505#[derive(Clone)]
506pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
507
508impl ProtocolTurnExtensionHandle {
509    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
510        Self(Arc::new(extension))
511    }
512
513    pub fn as_any(&self) -> &dyn Any {
514        self.0.as_any()
515    }
516
517    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
518        self.0.prompt_contributions()
519    }
520}
521
522impl fmt::Debug for ProtocolTurnExtensionHandle {
523    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524        f.write_str("ProtocolTurnExtensionHandle(..)")
525    }
526}
527
528pub trait ProtocolTurnExtension: Send + Sync {
529    fn as_any(&self) -> &dyn Any;
530
531    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
532        Vec::new()
533    }
534}
535
536#[derive(Clone)]
537pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
538
539impl ProtocolSessionExtensionHandle {
540    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
541        Self(Arc::new(extension))
542    }
543
544    pub fn as_any(&self) -> &dyn Any {
545        self.0.as_any()
546    }
547}
548
549impl fmt::Debug for ProtocolSessionExtensionHandle {
550    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
551        f.write_str("ProtocolSessionExtensionHandle(..)")
552    }
553}
554
555pub trait ProtocolSessionExtension: Send + Sync {
556    fn as_any(&self) -> &dyn Any;
557}
558
559#[derive(Clone, Debug)]
560pub(super) enum NormalizedItem {
561    Text(String),
562    Image(crate::AttachmentRef),
563}
564
565/// Canonical assistant output payload.
566#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
567pub struct AssistantOutput {
568    pub safe_text: String,
569    pub raw_text: String,
570    pub state: OutputState,
571}
572
573/// Quality and usability of assembled terminal output.
574#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
575#[serde(rename_all = "snake_case")]
576pub enum OutputState {
577    Usable,
578    EmptyOutput,
579    TracebackOnly,
580    RecoveredFromError,
581}
582
583/// Code execution output observed during a turn.
584#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
585pub struct CodeOutputRecord {
586    pub output: String,
587    #[serde(default, skip_serializing_if = "Option::is_none")]
588    pub error: Option<String>,
589}
590
591/// High-level execution summary for a completed turn.
592#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
593pub struct ExecutionSummary {
594    #[serde(default)]
595    pub had_tool_calls: bool,
596    #[serde(default)]
597    pub had_code_execution: bool,
598}
599
600/// Structured issue surfaced during turn execution.
601#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
602pub struct TurnIssue {
603    pub kind: String,
604    #[serde(default, skip_serializing_if = "Option::is_none")]
605    pub code: Option<String>,
606    #[serde(default, skip_serializing_if = "Option::is_none")]
607    pub terminal_reason: Option<crate::LlmTerminalReason>,
608    pub message: String,
609    #[serde(default, skip_serializing_if = "Option::is_none")]
610    pub raw: Option<String>,
611}
612
613/// Canonical high-level turn result returned to hosts.
614#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
615pub struct AssembledTurn {
616    pub state: SessionSnapshot,
617    pub outcome: crate::TurnOutcome,
618    pub assistant_output: AssistantOutput,
619    pub execution: ExecutionSummary,
620    #[serde(default)]
621    pub token_usage: TokenUsage,
622    /// Per-(session, source, model) ledger entries for child sessions whose
623    /// LLM calls completed during this turn. `token_usage` above is the
624    /// parent's own LLM tokens; `total_usage` (on the embed-facing
625    /// `TurnResult`) sums both.
626    #[serde(default)]
627    pub children_usage: Vec<TokenLedgerEntry>,
628    #[serde(default)]
629    pub tool_calls: Vec<ToolCallRecord>,
630    #[serde(default)]
631    pub errors: Vec<TurnIssue>,
632}
633
634/// Result of driving one logical host turn through any AgentFrame switches.
635///
636/// A frame switch is an internal runtime continuation, similar to compaction
637/// from a host's perspective. Callers that need a final answer can use
638/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
639#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
640pub struct AgentFrameRun {
641    pub turns: Vec<AssembledTurn>,
642}
643
644impl AgentFrameRun {
645    pub fn final_turn(&self) -> Option<&AssembledTurn> {
646        self.turns.last()
647    }
648
649    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
650        self.turns.pop()
651    }
652
653    pub fn frame_switch_count(&self) -> usize {
654        self.turns
655            .iter()
656            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
657            .count()
658    }
659}
660
661/// Termination policy knobs.
662#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
663pub struct TerminationPolicy {
664    #[serde(default)]
665    pub treat_missing_done_as_failure: bool,
666}
667
668impl Default for TerminationPolicy {
669    fn default() -> Self {
670        Self {
671            treat_missing_done_as_failure: true,
672        }
673    }
674}
675
676/// Host application sink for low-level streaming runtime events.
677/// `SessionEvent` is protocol-specific preview/progress data.
678#[async_trait::async_trait]
679pub trait EventSink: Send + Sync {
680    fn is_noop(&self) -> bool {
681        false
682    }
683
684    async fn emit(&self, event: SessionEvent);
685}
686
687/// No-op sink useful for callers that only care about final state.
688pub struct NoopEventSink;
689
690/// Static no-op event sink for callers that need a `&dyn EventSink` default.
691pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
692
693#[async_trait::async_trait]
694impl EventSink for NoopEventSink {
695    fn is_noop(&self) -> bool {
696        true
697    }
698
699    async fn emit(&self, _event: SessionEvent) {}
700}
701
702/// Stable identifier for a semantic turn activity.
703#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
704#[serde(transparent)]
705pub struct TurnActivityId(pub String);
706
707impl TurnActivityId {
708    pub fn new(id: impl Into<String>) -> Self {
709        Self(id.into())
710    }
711
712    pub fn fresh() -> Self {
713        Self(uuid::Uuid::new_v4().to_string())
714    }
715}
716
717/// App-facing semantic activity emitted during a turn.
718///
719/// `id` is unique per emitted activity event. `correlation_id` groups related
720/// events in the same logical activity, such as code start/completion, tool
721/// start/completion, or text deltas from one output block.
722#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
723pub struct TurnActivity {
724    pub id: TurnActivityId,
725    pub correlation_id: TurnActivityId,
726    #[serde(flatten)]
727    pub event: TurnEvent,
728}
729
730impl TurnActivity {
731    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
732        Self {
733            id: TurnActivityId::fresh(),
734            correlation_id,
735            event,
736        }
737    }
738
739    pub fn independent(event: TurnEvent) -> Self {
740        let correlation_id = TurnActivityId::fresh();
741        Self::new(correlation_id, event)
742    }
743}
744
745/// App-facing semantic event payload for a turn activity.
746///
747/// Unlike [`SessionEvent`], these events are stable application signals rather
748/// than low-level runtime/debug events. Public streams carry these payloads
749/// inside [`TurnActivity`] so every emitted item has identity.
750#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
751#[serde(tag = "type", rename_all = "snake_case")]
752#[allow(clippy::large_enum_variant)]
753pub enum TurnEvent {
754    QueuedWorkStarted {
755        boundary: crate::QueuedWorkClaimBoundary,
756        batch_ids: Vec<String>,
757        causes: Vec<crate::TurnCause>,
758    },
759    ModelRequestStarted {
760        protocol_iteration: usize,
761    },
762    AssistantProseDelta {
763        text: String,
764    },
765    ReasoningDelta {
766        text: String,
767    },
768    CodeBlockStarted {
769        language: String,
770        code: String,
771        #[serde(default, skip_serializing_if = "Option::is_none")]
772        graph_key: Option<String>,
773    },
774    CodeBlockCompleted {
775        language: String,
776        output: String,
777        error: Option<String>,
778        success: bool,
779        duration_ms: u64,
780        tool_call_ids: Vec<String>,
781        #[serde(default, skip_serializing_if = "Option::is_none")]
782        graph_key: Option<String>,
783    },
784    ToolCallStarted {
785        call_id: Option<String>,
786        name: String,
787        args: serde_json::Value,
788    },
789    ToolCallCompleted {
790        call_id: Option<String>,
791        name: String,
792        args: serde_json::Value,
793        output: crate::ToolCallOutput,
794        duration_ms: u64,
795    },
796    SubmittedValue {
797        value: serde_json::Value,
798    },
799    ToolValue {
800        tool_name: String,
801        value: serde_json::Value,
802    },
803    Usage {
804        protocol_iteration: usize,
805        usage: TokenUsage,
806        cumulative: TokenUsage,
807    },
808    ChildUsage {
809        session_id: String,
810        source: String,
811        model: String,
812        protocol_iteration: usize,
813        usage: TokenUsage,
814        cumulative: TokenUsage,
815    },
816    RetryStatus {
817        wait_seconds: u64,
818        attempt: usize,
819        max_attempts: usize,
820        reason: String,
821    },
822    PluginRuntime {
823        plugin_id: String,
824        event: crate::PluginRuntimeEvent,
825    },
826    QueuedInputAccepted {
827        checkpoint: crate::CheckpointKind,
828        inputs: Vec<crate::AcceptedInjectedTurnInput>,
829    },
830    QueuedMessagesCommitted {
831        messages: Vec<crate::PluginMessage>,
832        checkpoint: crate::CheckpointKind,
833    },
834    Error {
835        message: String,
836    },
837}
838
839#[async_trait::async_trait]
840pub trait TurnActivitySink: Send + Sync {
841    fn is_noop(&self) -> bool {
842        false
843    }
844
845    async fn emit(&self, activity: TurnActivity);
846}
847
848pub struct NoopTurnActivitySink;
849
850/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
851pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
852
853#[async_trait::async_trait]
854impl TurnActivitySink for NoopTurnActivitySink {
855    fn is_noop(&self) -> bool {
856        true
857    }
858
859    async fn emit(&self, _activity: TurnActivity) {}
860}
861
862/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
863/// turn-driving entry points (`stream_turn`,
864/// `stream_turn_with_agent_frames`).
865///
866/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
867/// default to no-op sinks. Execution scope is explicit and required at every
868/// runtime boundary that can execute nondeterministic work.
869pub struct TurnOptions<'a> {
870    events: Option<&'a dyn EventSink>,
871    turn_events: Option<&'a dyn TurnActivitySink>,
872    scoped_effect_controller: ScopedEffectController<'a>,
873    cancel: CancellationToken,
874}
875
876impl<'a> TurnOptions<'a> {
877    pub fn new(
878        cancel: CancellationToken,
879        scoped_effect_controller: ScopedEffectController<'a>,
880    ) -> Self {
881        Self {
882            events: None,
883            turn_events: None,
884            scoped_effect_controller,
885            cancel,
886        }
887    }
888
889    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
890        self.events = Some(events);
891        self
892    }
893
894    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
895        self.turn_events = Some(turn_events);
896        self
897    }
898
899    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
900        self.events.unwrap_or(&NOOP_EVENT_SINK)
901    }
902
903    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
904        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
905    }
906
907    pub(crate) fn execution_scope_id(&self) -> &str {
908        self.scoped_effect_controller.scope_id()
909    }
910
911    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
912        self.scoped_effect_controller.clone()
913    }
914}
915
916enum RuntimeStreamEvent {
917    Session(SessionEvent),
918    Turn(TurnActivity),
919}
920
921#[derive(Clone)]
922pub struct SessionStoreCreateRequest {
923    pub session_id: String,
924    pub relation: SessionRelation,
925    pub policy: SessionPolicy,
926}
927
928impl SessionStoreCreateRequest {
929    pub fn parent_session_id(&self) -> Option<&str> {
930        self.relation.parent_session_id()
931    }
932}
933
934#[async_trait::async_trait]
935pub trait SessionStoreFactory: Send + Sync {
936    /// Durability tier the stores produced by this factory provide; defaults to
937    /// [`DurabilityTier::Inline`].
938    fn durability_tier(&self) -> crate::DurabilityTier {
939        crate::DurabilityTier::Inline
940    }
941
942    async fn create_store(
943        &self,
944        request: &SessionStoreCreateRequest,
945    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
946
947    async fn open_existing_store(
948        &self,
949        _request: &SessionStoreCreateRequest,
950    ) -> Result<Option<Arc<dyn crate::store::RuntimePersistence>>, String> {
951        Ok(None)
952    }
953
954    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
955}
956
957/// Generic runtime for CLI or programmatic embedding.
958pub struct LashRuntime {
959    pub(in crate::runtime) session: Option<Session>,
960    pub(in crate::runtime) policy: SessionPolicy,
961    pub(in crate::runtime) host: RuntimeHost,
962    pub(in crate::runtime) services: RuntimeServices,
963    pub(in crate::runtime) state: RuntimeSessionState,
964    pub(in crate::runtime) runtime_scope_id: Arc<str>,
965    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
966    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
967    /// Protocol-owned turn options for this session.
968    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
969    /// Session-scoped token cost ledger. Shared by ALL
970    /// `RuntimeSessionServices` instances created from this runtime
971    /// (both per-turn and async maintenance). Entries accumulate here
972    /// and are drained into `state.token_ledger` at turn-commit time.
973    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
974    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
975    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
976    /// Resident-graph policy chosen by the host. Controls whether
977    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
978    /// graph or just the active path, matching the trimming behavior set at
979    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
980    pub(in crate::runtime) residency: Residency,
981}