Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod config_ops;
5mod effect;
6mod environment;
7mod error;
8mod host;
9mod in_memory_store;
10mod io;
11mod lifecycle;
12mod observation;
13mod process;
14mod process_work_runner;
15mod process_worker;
16mod queued_work_runner;
17mod session_api;
18mod session_manager;
19mod session_ops;
20mod state;
21#[cfg(test)]
22pub(crate) mod tests;
23mod turn_boundary;
24mod turn_commit_draft;
25mod turn_driver;
26mod turn_graph_editor;
27mod turn_loop;
28mod turn_queue;
29mod usage;
30
31use std::any::Any;
32use std::collections::HashMap;
33use std::fmt;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::atomic::{AtomicBool, Ordering};
37
38use tokio::sync::{Mutex, mpsc};
39use tokio_util::sync::CancellationToken;
40
41use crate::llm::types::{
42    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
43    LlmStreamEvent, LlmUsage,
44};
45use crate::plugin::{
46    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
47};
48use crate::sansio::{LlmCallError, Response};
49use crate::session_model::{
50    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
51    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
52    transport_stream_events,
53};
54use crate::{
55    CheckpointKind, PersistentRuntimeServices, PluginActionInvokeError, PromptHookContext,
56    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
57    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
58};
59use crate::{Effect, TurnMachine};
60
61use host::*;
62use session_manager::*;
63use turn_boundary::*;
64use turn_commit_draft::*;
65use turn_driver::*;
66
67pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 15 * 60 * 1000;
68
69// `PromptUsage` is re-exported below alongside the runtime's own types.
70pub use lash_sansio::PromptUsage;
71
72use assembly::{
73    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
74    LlmStreamState, LlmStreamSummary, TurnAssembler,
75};
76#[cfg(test)]
77#[allow(unused_imports)]
78use assembly::{classify_output_state, sanitize_assistant_output};
79pub use builder::EmbeddedRuntimeBuilder;
80pub use causal::process_event_invocation;
81pub(crate) use causal::tool_retry_sleep_invocation;
82pub(crate) use effect::RuntimeEffectControllerHandle;
83pub use effect::{
84    AwaitEventKey, AwaitEventWaitIdentity, CausalRef, EffectHost, ExecutionScope,
85    ExternalCompletionError, InlineEffectHost, InlineRuntimeEffectController, LlmAttachmentSpec,
86    LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, Resolution, ResolveOutcome,
87    RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
88    RuntimeEffectEnvelope, RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome,
89    RuntimeInvocation, RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
90    ToolCallLaunch,
91};
92pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
93pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
94pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
95pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
96use io::normalize_input_items;
97pub use observation::{
98    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
99    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
100    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
101    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
102    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
103    SessionRevision,
104};
105#[cfg(any(test, feature = "testing"))]
106pub use process::TestLocalProcessRegistry;
107pub use process::{
108    DefaultProcessCancelAbility, ObservedProcess, ObservedProcessEvent, ObservedWorkItem,
109    PROCESS_LEASE_SCHEMA_VERSION, PreparedProcessEventAppend, ProcessAwaitOutput,
110    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
111    ProcessCancelSummary, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
112    ProcessEventSemantics, ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext,
113    ProcessExecutionEnvRef, ProcessExecutionEnvSpec, ProcessExternalRef, ProcessHandleDescriptor,
114    ProcessHandleGrant, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessId, ProcessInput,
115    ProcessLease, ProcessLeaseCompletion, ProcessLifecycleStatus, ProcessListFilter,
116    ProcessListMode, ProcessOpScope, ProcessOriginator, ProcessProvenance, ProcessRecord,
117    ProcessRegistration, ProcessRegistry, ProcessService, ProcessSessionDeleteReport,
118    ProcessSpawnProvenance, ProcessStartGrant, ProcessStartOptions, ProcessStartRequest,
119    ProcessStatus, ProcessStatusFilter, ProcessTerminalSemantics, ProcessTerminalSpec,
120    ProcessTerminalState, ProcessValueSelector, ProcessWake, ProcessWakeDedupeKey,
121    ProcessWakeDelivery, ProcessWakeSpec, ProcessWorkObserver, ProcessWorkSnapshot, SessionScope,
122    SessionScopeId, UnavailableProcessService, WaitKind, WaitState, current_epoch_ms,
123    epoch_ms_from_system_time, lashlang_process_event_types, lashlang_process_signal_event_types,
124    load_process_execution_env, materialize_process_event_semantics, persist_process_execution_env,
125    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
126    process_signal_event_type, process_signal_name_from_event_type, process_signal_wait_key,
127    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
128    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
129    validate_process_signal_name,
130};
131pub use process_work_runner::{
132    InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver, ProcessWorkPoke, ProcessWorkRunner,
133};
134pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
135pub use queued_work_runner::{
136    QueuedWorkPoke, QueuedWorkRunHandle, QueuedWorkRunOutcome, QueuedWorkRunRequest,
137    QueuedWorkRunner,
138};
139pub use session_manager::DirectCompletionClient;
140pub use state::RuntimeSessionState;
141use state::{
142    append_session_nodes_to_state, apply_residency_on_load, apply_session_checkpoint,
143    apply_session_head, normalize_session_graph, open_agent_frame_in_state,
144};
145pub use turn_loop::ensure_durable_effect_input;
146pub use turn_queue::{
147    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
148    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
149    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand, SessionCommandReceipt,
150    SlotPolicy, process_wake_batch_draft,
151};
152pub use usage::{
153    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
154    diff_usage_reports,
155};
156use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
157
158#[doc(hidden)]
159#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
160pub enum RuntimeTurnPhase {
161    ContextTransform,
162    BeforeTurnHooks,
163    PromptBuild,
164    EffectLoop,
165    FinalizeTurn,
166    PersistTurn,
167    FinalCommit,
168    PostPersistHooks,
169}
170
171#[doc(hidden)]
172pub trait RuntimeTurnPhaseProbe: Send + Sync {
173    fn begin(&self, phase: RuntimeTurnPhase);
174    fn end(&self, phase: RuntimeTurnPhase);
175    fn begin_named(&self, _phase: &str) {}
176    fn end_named(&self, _phase: &str) {}
177}
178
179#[doc(hidden)]
180#[derive(Clone, Default)]
181pub struct RuntimeTurnPhaseProbeSlot {
182    probes: Arc<StdMutex<HashMap<crate::SessionScopeId, Arc<dyn RuntimeTurnPhaseProbe>>>>,
183}
184
185impl RuntimeTurnPhaseProbeSlot {
186    pub fn set_for_session(
187        &self,
188        session_id: impl Into<String>,
189        probe: Arc<dyn RuntimeTurnPhaseProbe>,
190    ) {
191        self.set_for_scope(&crate::SessionScope::new(session_id), probe);
192    }
193
194    pub fn set_for_scope(
195        &self,
196        scope: &crate::SessionScope,
197        probe: Arc<dyn RuntimeTurnPhaseProbe>,
198    ) {
199        self.probes
200            .lock()
201            .expect("runtime phase probe slot")
202            .insert(scope.id(), probe);
203    }
204
205    pub fn get_for_scope(
206        &self,
207        scope: &crate::SessionScope,
208    ) -> Option<Arc<dyn RuntimeTurnPhaseProbe>> {
209        let probes = self.probes.lock().expect("runtime phase probe slot");
210        probes.get(&scope.id()).cloned().or_else(|| {
211            probes
212                .get(&crate::SessionScope::new(&scope.session_id).id())
213                .cloned()
214        })
215    }
216}
217
218#[doc(hidden)]
219pub struct RuntimeNamedPhase {
220    probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
221    phase: &'static str,
222}
223
224impl RuntimeNamedPhase {
225    pub fn begin(
226        probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
227        phase: &'static str,
228    ) -> RuntimeNamedPhase {
229        if let Some(probe) = probe.as_ref() {
230            probe.begin_named(phase);
231        }
232        RuntimeNamedPhase { probe, phase }
233    }
234}
235
236impl Drop for RuntimeNamedPhase {
237    fn drop(&mut self) {
238        if let Some(probe) = self.probe.as_ref() {
239            probe.end_named(self.phase);
240        }
241    }
242}
243
244/// Host-provided per-turn input.
245#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
246#[serde(tag = "type", rename_all = "snake_case")]
247pub enum InputItem {
248    Text { text: String },
249    ImageRef { id: String },
250}
251
252impl InputItem {
253    pub fn text(text: impl Into<String>) -> Self {
254        Self::Text { text: text.into() }
255    }
256
257    pub fn image_ref(id: impl Into<String>) -> Self {
258        Self::ImageRef { id: id.into() }
259    }
260}
261
262/// Host-provided per-turn input.
263#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
264pub struct TurnInput {
265    pub items: Vec<InputItem>,
266    #[serde(default)]
267    pub image_blobs: HashMap<String, Vec<u8>>,
268    /// Per-turn override for protocol-owned turn options.
269    #[serde(default, skip_serializing_if = "Option::is_none")]
270    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
271    /// Optional externally-stable trace turn id. Normal runtime callers leave
272    /// this empty and the runtime generates one per outer turn.
273    #[serde(default, skip_serializing_if = "Option::is_none")]
274    pub trace_turn_id: Option<String>,
275    #[serde(skip)]
276    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
277    #[serde(skip)]
278    pub turn_context: TurnContext,
279}
280
281impl TurnInput {
282    pub fn empty() -> Self {
283        Self::items(std::iter::empty())
284    }
285
286    pub fn text(text: impl Into<String>) -> Self {
287        Self::items([InputItem::text(text)])
288    }
289
290    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
291        Self {
292            items: items.into_iter().collect(),
293            image_blobs: HashMap::new(),
294            protocol_turn_options: None,
295            trace_turn_id: None,
296            protocol_extension: None,
297            turn_context: TurnContext::default(),
298        }
299    }
300
301    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
302        self.image_blobs.insert(id.into(), bytes);
303        self
304    }
305
306    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
307    where
308        I: IntoIterator<Item = (K, Vec<u8>)>,
309        K: Into<String>,
310    {
311        self.image_blobs.extend(
312            image_blobs
313                .into_iter()
314                .map(|(id, bytes)| (id.into(), bytes)),
315        );
316        self
317    }
318
319    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
320        let id = id.into();
321        self.items.push(InputItem::image_ref(id.clone()));
322        self.image_blobs.insert(id, bytes);
323        self
324    }
325
326    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
327        self.protocol_turn_options = Some(options);
328        self
329    }
330
331    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
332        self.trace_turn_id = Some(trace_turn_id.into());
333        self
334    }
335}
336
337/// Per-turn, in-process side channel of typed plugin inputs.
338///
339/// This is an `Any`-keyed map of live Rust values handed to plugins for a
340/// single turn. It is deliberately **not** serializable: the values never
341/// survive a process boundary, so durable effect-host runs explicitly reject a
342/// turn that carries any live inputs (see
343/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
344/// encode replayable data in
345/// `protocol_turn_options` or persisted plugin state.
346#[derive(Clone, Default)]
347pub struct LiveTurnInputs {
348    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
349}
350
351impl LiveTurnInputs {
352    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
353    where
354        T: Send + Sync + 'static,
355    {
356        self.inputs.insert(plugin_id, Arc::new(input));
357    }
358
359    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
360    where
361        T: 'static,
362    {
363        self.inputs
364            .get(plugin_id)
365            .and_then(|input| input.downcast_ref::<T>())
366    }
367
368    fn contains(&self, plugin_id: &'static str) -> bool {
369        self.inputs.contains_key(plugin_id)
370    }
371
372    fn is_empty(&self) -> bool {
373        self.inputs.is_empty()
374    }
375
376    pub fn plugin_ids(&self) -> Vec<&'static str> {
377        self.inputs.keys().copied().collect()
378    }
379
380    /// Returns an error when live per-turn inputs would make a durable effect
381    /// host replay depend on process-local values.
382    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
383        if self.is_empty() {
384            return Ok(());
385        }
386        Err(RuntimeError::new(
387            RuntimeErrorCode::DurableEffectLivePluginInput,
388            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
389        ))
390    }
391}
392
393#[derive(Clone, Default)]
394pub struct TurnContext {
395    plugin_inputs: LiveTurnInputs,
396    provider: Option<crate::ProviderHandle>,
397    model: Option<crate::ModelSpec>,
398    prompt: crate::PromptLayer,
399}
400
401impl TurnContext {
402    pub fn new() -> Self {
403        Self::default()
404    }
405
406    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
407    where
408        T: Send + Sync + 'static,
409    {
410        self.plugin_inputs.insert(plugin_id, input);
411    }
412
413    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
414        self.provider = Some(provider);
415    }
416
417    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
418        self.provider.as_ref()
419    }
420
421    pub fn set_model(&mut self, model: crate::ModelSpec) {
422        self.model = Some(model);
423    }
424
425    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
426        self.model.as_ref()
427    }
428
429    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
430    where
431        T: 'static,
432    {
433        self.plugin_inputs.get(plugin_id)
434    }
435
436    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
437        self.plugin_inputs.contains(plugin_id)
438    }
439
440    pub fn has_live_plugin_inputs(&self) -> bool {
441        !self.plugin_inputs.is_empty()
442    }
443
444    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
445        self.plugin_inputs.plugin_ids()
446    }
447
448    /// Live plugin inputs for this turn. The durable boundary inspects this to
449    /// reject turns carrying non-serializable live state.
450    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
451        &self.plugin_inputs
452    }
453
454    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
455        self.prompt.template = Some(template);
456    }
457
458    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
459        self.prompt.add_contribution(contribution);
460    }
461
462    pub fn replace_prompt_slot(
463        &mut self,
464        slot: crate::PromptSlot,
465        contributions: impl IntoIterator<Item = crate::PromptContribution>,
466    ) {
467        self.prompt.replace_slot(slot, contributions);
468    }
469
470    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
471        self.prompt.clear_slot(slot);
472    }
473
474    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
475        self.prompt = prompt;
476    }
477
478    pub fn prompt_layer(&self) -> &crate::PromptLayer {
479        &self.prompt
480    }
481}
482
483impl fmt::Debug for TurnContext {
484    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
485        f.debug_struct("TurnContext")
486            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
487            .field("has_provider", &self.provider.is_some())
488            .field("has_model", &self.model.is_some())
489            .field("has_prompt_layer", &(!self.prompt.is_empty()))
490            .finish()
491    }
492}
493
494#[derive(Clone)]
495pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
496
497impl ProtocolTurnExtensionHandle {
498    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
499        Self(Arc::new(extension))
500    }
501
502    pub fn as_any(&self) -> &dyn Any {
503        self.0.as_any()
504    }
505
506    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
507        self.0.prompt_contributions()
508    }
509}
510
511impl fmt::Debug for ProtocolTurnExtensionHandle {
512    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
513        f.write_str("ProtocolTurnExtensionHandle(..)")
514    }
515}
516
517pub trait ProtocolTurnExtension: Send + Sync {
518    fn as_any(&self) -> &dyn Any;
519
520    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
521        Vec::new()
522    }
523}
524
525#[derive(Clone)]
526pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
527
528impl ProtocolSessionExtensionHandle {
529    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
530        Self(Arc::new(extension))
531    }
532
533    pub fn as_any(&self) -> &dyn Any {
534        self.0.as_any()
535    }
536}
537
538impl fmt::Debug for ProtocolSessionExtensionHandle {
539    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540        f.write_str("ProtocolSessionExtensionHandle(..)")
541    }
542}
543
544pub trait ProtocolSessionExtension: Send + Sync {
545    fn as_any(&self) -> &dyn Any;
546}
547
548#[derive(Clone, Debug)]
549pub(super) enum NormalizedItem {
550    Text(String),
551    Image(crate::AttachmentRef),
552}
553
554/// Canonical assistant output payload.
555#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
556pub struct AssistantOutput {
557    pub safe_text: String,
558    pub raw_text: String,
559    pub state: OutputState,
560}
561
562/// Quality and usability of assembled terminal output.
563#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
564#[serde(rename_all = "snake_case")]
565pub enum OutputState {
566    Usable,
567    EmptyOutput,
568    TracebackOnly,
569    RecoveredFromError,
570}
571
572/// RLM code execution output observed during a turn.
573#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
574pub struct CodeOutputRecord {
575    pub output: String,
576    #[serde(default, skip_serializing_if = "Option::is_none")]
577    pub error: Option<String>,
578}
579
580/// High-level execution summary for a completed turn.
581#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
582pub struct ExecutionSummary {
583    #[serde(default)]
584    pub had_tool_calls: bool,
585    #[serde(default)]
586    pub had_code_execution: bool,
587}
588
589/// Structured issue surfaced during turn execution.
590#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
591pub struct TurnIssue {
592    pub kind: String,
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    pub code: Option<String>,
595    #[serde(default, skip_serializing_if = "Option::is_none")]
596    pub terminal_reason: Option<crate::LlmTerminalReason>,
597    pub message: String,
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub raw: Option<String>,
600}
601
602/// Canonical high-level turn result returned to hosts.
603#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
604pub struct AssembledTurn {
605    pub state: SessionSnapshot,
606    pub outcome: crate::TurnOutcome,
607    pub assistant_output: AssistantOutput,
608    pub execution: ExecutionSummary,
609    #[serde(default)]
610    pub token_usage: TokenUsage,
611    /// Per-(session, source, model) ledger entries for child sessions whose
612    /// LLM calls completed during this turn. `token_usage` above is the
613    /// parent's own LLM tokens; `total_usage` (on the embed-facing
614    /// `TurnResult`) sums both.
615    #[serde(default)]
616    pub children_usage: Vec<TokenLedgerEntry>,
617    #[serde(default)]
618    pub tool_calls: Vec<ToolCallRecord>,
619    #[serde(default)]
620    pub errors: Vec<TurnIssue>,
621}
622
623/// Result of driving one logical host turn through any AgentFrame switches.
624///
625/// A frame switch is an internal runtime continuation, similar to compaction
626/// from a host's perspective. Callers that need a final answer can use
627/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
628#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
629pub struct AgentFrameRun {
630    pub turns: Vec<AssembledTurn>,
631}
632
633impl AgentFrameRun {
634    pub fn final_turn(&self) -> Option<&AssembledTurn> {
635        self.turns.last()
636    }
637
638    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
639        self.turns.pop()
640    }
641
642    pub fn frame_switch_count(&self) -> usize {
643        self.turns
644            .iter()
645            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
646            .count()
647    }
648}
649
650/// Termination policy knobs.
651#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
652pub struct TerminationPolicy {
653    #[serde(default)]
654    pub treat_missing_done_as_failure: bool,
655}
656
657impl Default for TerminationPolicy {
658    fn default() -> Self {
659        Self {
660            treat_missing_done_as_failure: true,
661        }
662    }
663}
664
665/// Host application sink for low-level streaming runtime events.
666/// `SessionEvent` is protocol-specific preview/progress data.
667#[async_trait::async_trait]
668pub trait EventSink: Send + Sync {
669    fn is_noop(&self) -> bool {
670        false
671    }
672
673    async fn emit(&self, event: SessionEvent);
674}
675
676/// No-op sink useful for callers that only care about final state.
677pub struct NoopEventSink;
678
679/// Static no-op event sink for callers that need a `&dyn EventSink` default.
680pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
681
682#[async_trait::async_trait]
683impl EventSink for NoopEventSink {
684    fn is_noop(&self) -> bool {
685        true
686    }
687
688    async fn emit(&self, _event: SessionEvent) {}
689}
690
691/// Stable identifier for a semantic turn activity.
692#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
693#[serde(transparent)]
694pub struct TurnActivityId(pub String);
695
696impl TurnActivityId {
697    pub fn new(id: impl Into<String>) -> Self {
698        Self(id.into())
699    }
700
701    pub fn fresh() -> Self {
702        Self(uuid::Uuid::new_v4().to_string())
703    }
704}
705
706/// App-facing semantic activity emitted during a turn.
707///
708/// `id` is unique per emitted activity event. `correlation_id` groups related
709/// events in the same logical activity, such as code start/completion, tool
710/// start/completion, or text deltas from one output block.
711#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
712pub struct TurnActivity {
713    pub id: TurnActivityId,
714    pub correlation_id: TurnActivityId,
715    #[serde(flatten)]
716    pub event: TurnEvent,
717}
718
719impl TurnActivity {
720    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
721        Self {
722            id: TurnActivityId::fresh(),
723            correlation_id,
724            event,
725        }
726    }
727
728    pub fn independent(event: TurnEvent) -> Self {
729        let correlation_id = TurnActivityId::fresh();
730        Self::new(correlation_id, event)
731    }
732}
733
734/// App-facing semantic event payload for a turn activity.
735///
736/// Unlike [`SessionEvent`], these events are stable application signals rather
737/// than low-level runtime/debug events. Public streams carry these payloads
738/// inside [`TurnActivity`] so every emitted item has identity.
739#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
740#[serde(tag = "type", rename_all = "snake_case")]
741#[allow(clippy::large_enum_variant)]
742pub enum TurnEvent {
743    QueuedWorkStarted {
744        boundary: crate::QueuedWorkClaimBoundary,
745        batch_ids: Vec<String>,
746        causes: Vec<crate::TurnCause>,
747    },
748    ModelRequestStarted {
749        protocol_iteration: usize,
750    },
751    AssistantProseDelta {
752        text: String,
753    },
754    ReasoningDelta {
755        text: String,
756    },
757    CodeBlockStarted {
758        language: String,
759        code: String,
760        #[serde(default, skip_serializing_if = "Option::is_none")]
761        graph_key: Option<String>,
762    },
763    CodeBlockCompleted {
764        language: String,
765        output: String,
766        error: Option<String>,
767        success: bool,
768        duration_ms: u64,
769        tool_call_ids: Vec<String>,
770        #[serde(default, skip_serializing_if = "Option::is_none")]
771        graph_key: Option<String>,
772    },
773    ToolCallStarted {
774        call_id: Option<String>,
775        name: String,
776        args: serde_json::Value,
777    },
778    ToolCallCompleted {
779        call_id: Option<String>,
780        name: String,
781        args: serde_json::Value,
782        output: crate::ToolCallOutput,
783        duration_ms: u64,
784    },
785    SubmittedValue {
786        value: serde_json::Value,
787    },
788    ToolValue {
789        tool_name: String,
790        value: serde_json::Value,
791    },
792    Usage {
793        protocol_iteration: usize,
794        usage: TokenUsage,
795        cumulative: TokenUsage,
796    },
797    ChildUsage {
798        session_id: String,
799        source: String,
800        model: String,
801        protocol_iteration: usize,
802        usage: TokenUsage,
803        cumulative: TokenUsage,
804    },
805    RetryStatus {
806        wait_seconds: u64,
807        attempt: usize,
808        max_attempts: usize,
809        reason: String,
810    },
811    PluginRuntime {
812        plugin_id: String,
813        event: crate::PluginRuntimeEvent,
814    },
815    QueuedInputAccepted {
816        checkpoint: crate::CheckpointKind,
817        inputs: Vec<crate::AcceptedInjectedTurnInput>,
818    },
819    QueuedMessagesCommitted {
820        messages: Vec<crate::PluginMessage>,
821        checkpoint: crate::CheckpointKind,
822    },
823    Error {
824        message: String,
825    },
826}
827
828#[async_trait::async_trait]
829pub trait TurnActivitySink: Send + Sync {
830    fn is_noop(&self) -> bool {
831        false
832    }
833
834    async fn emit(&self, activity: TurnActivity);
835}
836
837pub struct NoopTurnActivitySink;
838
839/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
840pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
841
842#[async_trait::async_trait]
843impl TurnActivitySink for NoopTurnActivitySink {
844    fn is_noop(&self) -> bool {
845        true
846    }
847
848    async fn emit(&self, _activity: TurnActivity) {}
849}
850
851/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
852/// turn-driving entry points (`stream_turn`,
853/// `stream_turn_with_agent_frames`).
854///
855/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
856/// default to no-op sinks. Execution scope is explicit and required at every
857/// runtime boundary that can execute nondeterministic work.
858pub struct TurnOptions<'a> {
859    events: Option<&'a dyn EventSink>,
860    turn_events: Option<&'a dyn TurnActivitySink>,
861    scoped_effect_controller: ScopedEffectController<'a>,
862    cancel: CancellationToken,
863}
864
865impl<'a> TurnOptions<'a> {
866    pub fn new(
867        cancel: CancellationToken,
868        scoped_effect_controller: ScopedEffectController<'a>,
869    ) -> Self {
870        Self {
871            events: None,
872            turn_events: None,
873            scoped_effect_controller,
874            cancel,
875        }
876    }
877
878    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
879        self.events = Some(events);
880        self
881    }
882
883    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
884        self.turn_events = Some(turn_events);
885        self
886    }
887
888    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
889        self.events.unwrap_or(&NOOP_EVENT_SINK)
890    }
891
892    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
893        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
894    }
895
896    pub(crate) fn execution_scope_id(&self) -> &str {
897        self.scoped_effect_controller.scope_id()
898    }
899
900    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
901        self.scoped_effect_controller.clone()
902    }
903}
904
905enum RuntimeStreamEvent {
906    Session(SessionEvent),
907    Turn(TurnActivity),
908}
909
910#[derive(Clone)]
911pub struct SessionStoreCreateRequest {
912    pub session_id: String,
913    pub relation: SessionRelation,
914    pub policy: SessionPolicy,
915}
916
917impl SessionStoreCreateRequest {
918    pub fn parent_session_id(&self) -> Option<&str> {
919        self.relation.parent_session_id()
920    }
921}
922
923#[async_trait::async_trait]
924pub trait SessionStoreFactory: Send + Sync {
925    /// Durability tier the stores produced by this factory provide; defaults to
926    /// [`DurabilityTier::Inline`].
927    fn durability_tier(&self) -> crate::DurabilityTier {
928        crate::DurabilityTier::Inline
929    }
930
931    async fn create_store(
932        &self,
933        request: &SessionStoreCreateRequest,
934    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
935
936    async fn open_existing_store(
937        &self,
938        _request: &SessionStoreCreateRequest,
939    ) -> Result<Option<Arc<dyn crate::store::RuntimePersistence>>, String> {
940        Ok(None)
941    }
942
943    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
944}
945
946/// Generic runtime for CLI or programmatic embedding.
947pub struct LashRuntime {
948    pub(in crate::runtime) session: Option<Session>,
949    pub(in crate::runtime) policy: SessionPolicy,
950    pub(in crate::runtime) host: RuntimeHost,
951    pub(in crate::runtime) services: RuntimeServices,
952    pub(in crate::runtime) state: RuntimeSessionState,
953    pub(in crate::runtime) runtime_scope_id: Arc<str>,
954    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
955    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
956    /// Protocol-owned turn options for this session.
957    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
958    /// Session-scoped token cost ledger. Shared by ALL
959    /// `RuntimeSessionServices` instances created from this runtime
960    /// (both per-turn and async maintenance). Entries accumulate here
961    /// and are drained into `state.token_ledger` at turn-commit time.
962    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
963    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
964    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
965    /// Resident-graph policy chosen by the host. Controls whether
966    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
967    /// graph or just the active path, matching the trimming behavior set at
968    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
969    pub(in crate::runtime) residency: Residency,
970}