Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod config_ops;
5mod effect;
6mod environment;
7mod error;
8mod host;
9mod in_memory_store;
10mod io;
11mod lifecycle;
12mod observation;
13mod process;
14mod process_work_runner;
15mod process_worker;
16mod queued_work_runner;
17mod session_api;
18mod session_manager;
19mod session_ops;
20mod state;
21#[cfg(test)]
22pub(crate) mod tests;
23mod turn_boundary;
24mod turn_commit_draft;
25mod turn_driver;
26mod turn_graph_editor;
27mod turn_loop;
28mod turn_queue;
29mod usage;
30
31use std::any::Any;
32use std::collections::HashMap;
33use std::fmt;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::atomic::{AtomicBool, Ordering};
37
38use tokio::sync::{Mutex, mpsc};
39use tokio_util::sync::CancellationToken;
40
41use crate::llm::types::{
42    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
43    LlmStreamEvent, LlmUsage,
44};
45use crate::plugin::{
46    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
47};
48use crate::sansio::{LlmCallError, Response};
49use crate::session_model::{
50    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
51    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
52    transport_stream_events,
53};
54use crate::{
55    CheckpointKind, PersistentRuntimeServices, PluginActionInvokeError, PromptHookContext,
56    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
57    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
58};
59use crate::{Effect, TurnMachine};
60
61use host::*;
62use session_manager::*;
63use turn_boundary::*;
64use turn_commit_draft::*;
65use turn_driver::*;
66
67pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 15 * 60 * 1000;
68
69// `PromptUsage` is re-exported below alongside the runtime's own types.
70pub use lash_sansio::PromptUsage;
71
72use assembly::{
73    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
74    LlmStreamState, LlmStreamSummary, TurnAssembler,
75};
76#[cfg(test)]
77#[allow(unused_imports)]
78use assembly::{classify_output_state, sanitize_assistant_output};
79pub use builder::EmbeddedRuntimeBuilder;
80pub use causal::process_event_invocation;
81pub(crate) use causal::tool_retry_sleep_invocation;
82pub(crate) use effect::RuntimeEffectControllerHandle;
83pub use effect::{
84    CausalRef, EffectHost, EffectScope, InlineEffectHost, InlineRuntimeEffectController,
85    LlmAttachmentSpec, LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand,
86    RuntimeEffectController, RuntimeEffectControllerError, RuntimeEffectEnvelope,
87    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
88    RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
89};
90pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
91pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
92pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
93pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
94use io::normalize_input_items;
95pub use observation::{RuntimeHandle, RuntimeObservation};
96#[cfg(any(test, feature = "testing"))]
97pub use process::TestLocalProcessRegistry;
98pub use process::{
99    DefaultProcessCancelAbility, ObservedProcess, ObservedProcessEvent, ObservedWorkItem,
100    PROCESS_LEASE_SCHEMA_VERSION, PreparedProcessEventAppend, ProcessAwaitOutput,
101    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
102    ProcessCancelSummary, ProcessDefinitionSelector, ProcessDefinitionSummary, ProcessEvent,
103    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessEventSemantics,
104    ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext, ProcessExternalRef,
105    ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry, ProcessHandleSummary,
106    ProcessId, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessLifecycleStatus,
107    ProcessListFilter, ProcessListMode, ProcessOpScope, ProcessProvenance, ProcessRecord,
108    ProcessRegistration, ProcessRegistry, ProcessScope, ProcessScopeId, ProcessService,
109    ProcessSessionDeleteReport, ProcessStartGrant, ProcessStartOptions, ProcessStartRequest,
110    ProcessStatus, ProcessStatusFilter, ProcessTerminalSemantics, ProcessTerminalSpec,
111    ProcessTerminalState, ProcessValueSelector, ProcessWake, ProcessWakeDedupeKey,
112    ProcessWakeDelivery, ProcessWakeSpec, ProcessWorkObserver, ProcessWorkSnapshot,
113    UnavailableProcessService, current_epoch_ms, epoch_ms_from_system_time,
114    lashlang_process_event_types, materialize_process_event_semantics,
115    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
116    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
117    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
118};
119pub use process_work_runner::{
120    InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver, ProcessWorkPoke, ProcessWorkRunner,
121};
122pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
123pub use queued_work_runner::{
124    QueuedWorkPoke, QueuedWorkRunHandle, QueuedWorkRunOutcome, QueuedWorkRunRequest,
125    QueuedWorkRunner,
126};
127pub use session_manager::DirectCompletionClient;
128pub use state::RuntimeSessionState;
129use state::{
130    append_session_nodes_to_state, apply_residency_on_load, apply_session_checkpoint,
131    apply_session_head, normalize_session_graph,
132};
133pub use turn_loop::ensure_durable_effect_input;
134pub use turn_queue::{
135    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
136    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
137    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand, SessionCommandReceipt,
138    SlotPolicy, process_wake_batch_draft,
139};
140pub use usage::{
141    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
142    diff_usage_reports,
143};
144use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
145
146#[doc(hidden)]
147#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
148pub enum RuntimeTurnPhase {
149    ContextTransform,
150    BeforeTurnHooks,
151    PromptBuild,
152    EffectLoop,
153    FinalizeTurn,
154    PersistTurn,
155    FinalCommit,
156    PostPersistHooks,
157}
158
159#[doc(hidden)]
160pub trait RuntimeTurnPhaseProbe: Send + Sync {
161    fn begin(&self, phase: RuntimeTurnPhase);
162    fn end(&self, phase: RuntimeTurnPhase);
163    fn begin_named(&self, _phase: &str) {}
164    fn end_named(&self, _phase: &str) {}
165}
166
167/// Host-provided per-turn input.
168#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
169#[serde(tag = "type", rename_all = "snake_case")]
170pub enum InputItem {
171    Text { text: String },
172    ImageRef { id: String },
173}
174
175impl InputItem {
176    pub fn text(text: impl Into<String>) -> Self {
177        Self::Text { text: text.into() }
178    }
179
180    pub fn image_ref(id: impl Into<String>) -> Self {
181        Self::ImageRef { id: id.into() }
182    }
183}
184
185/// Host-provided per-turn input.
186#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
187pub struct TurnInput {
188    pub items: Vec<InputItem>,
189    #[serde(default)]
190    pub image_blobs: HashMap<String, Vec<u8>>,
191    /// Per-turn override for protocol-owned turn options.
192    #[serde(default, skip_serializing_if = "Option::is_none")]
193    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
194    /// Optional externally-stable trace turn id. Normal runtime callers leave
195    /// this empty and the runtime generates one per outer turn.
196    #[serde(default, skip_serializing_if = "Option::is_none")]
197    pub trace_turn_id: Option<String>,
198    #[serde(skip)]
199    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
200    #[serde(skip)]
201    pub turn_context: TurnContext,
202}
203
204impl TurnInput {
205    pub fn empty() -> Self {
206        Self::items(std::iter::empty())
207    }
208
209    pub fn text(text: impl Into<String>) -> Self {
210        Self::items([InputItem::text(text)])
211    }
212
213    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
214        Self {
215            items: items.into_iter().collect(),
216            image_blobs: HashMap::new(),
217            protocol_turn_options: None,
218            trace_turn_id: None,
219            protocol_extension: None,
220            turn_context: TurnContext::default(),
221        }
222    }
223
224    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
225        self.image_blobs.insert(id.into(), bytes);
226        self
227    }
228
229    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
230    where
231        I: IntoIterator<Item = (K, Vec<u8>)>,
232        K: Into<String>,
233    {
234        self.image_blobs.extend(
235            image_blobs
236                .into_iter()
237                .map(|(id, bytes)| (id.into(), bytes)),
238        );
239        self
240    }
241
242    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
243        let id = id.into();
244        self.items.push(InputItem::image_ref(id.clone()));
245        self.image_blobs.insert(id, bytes);
246        self
247    }
248
249    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
250        self.protocol_turn_options = Some(options);
251        self
252    }
253
254    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
255        self.trace_turn_id = Some(trace_turn_id.into());
256        self
257    }
258}
259
260/// Per-turn, in-process side channel of typed plugin inputs.
261///
262/// This is an `Any`-keyed map of live Rust values handed to plugins for a
263/// single turn. It is deliberately **not** serializable: the values never
264/// survive a process boundary, so durable effect-host runs explicitly reject a
265/// turn that carries any live inputs (see
266/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
267/// encode replayable data in
268/// `protocol_turn_options` or persisted plugin state.
269#[derive(Clone, Default)]
270pub struct LiveTurnInputs {
271    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
272}
273
274impl LiveTurnInputs {
275    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
276    where
277        T: Send + Sync + 'static,
278    {
279        self.inputs.insert(plugin_id, Arc::new(input));
280    }
281
282    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
283    where
284        T: 'static,
285    {
286        self.inputs
287            .get(plugin_id)
288            .and_then(|input| input.downcast_ref::<T>())
289    }
290
291    fn contains(&self, plugin_id: &'static str) -> bool {
292        self.inputs.contains_key(plugin_id)
293    }
294
295    fn is_empty(&self) -> bool {
296        self.inputs.is_empty()
297    }
298
299    pub fn plugin_ids(&self) -> Vec<&'static str> {
300        self.inputs.keys().copied().collect()
301    }
302
303    /// Returns an error when live per-turn inputs would make a durable effect
304    /// host replay depend on process-local values.
305    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
306        if self.is_empty() {
307            return Ok(());
308        }
309        Err(RuntimeError::new(
310            RuntimeErrorCode::DurableEffectLivePluginInput,
311            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
312        ))
313    }
314}
315
316#[derive(Clone, Default)]
317pub struct TurnContext {
318    plugin_inputs: LiveTurnInputs,
319    provider: Option<crate::ProviderHandle>,
320    model: Option<crate::ModelSpec>,
321    prompt: crate::PromptLayer,
322}
323
324impl TurnContext {
325    pub fn new() -> Self {
326        Self::default()
327    }
328
329    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
330    where
331        T: Send + Sync + 'static,
332    {
333        self.plugin_inputs.insert(plugin_id, input);
334    }
335
336    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
337        self.provider = Some(provider);
338    }
339
340    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
341        self.provider.as_ref()
342    }
343
344    pub fn set_model(&mut self, model: crate::ModelSpec) {
345        self.model = Some(model);
346    }
347
348    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
349        self.model.as_ref()
350    }
351
352    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
353    where
354        T: 'static,
355    {
356        self.plugin_inputs.get(plugin_id)
357    }
358
359    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
360        self.plugin_inputs.contains(plugin_id)
361    }
362
363    pub fn has_live_plugin_inputs(&self) -> bool {
364        !self.plugin_inputs.is_empty()
365    }
366
367    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
368        self.plugin_inputs.plugin_ids()
369    }
370
371    /// Live plugin inputs for this turn. The durable boundary inspects this to
372    /// reject turns carrying non-serializable live state.
373    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
374        &self.plugin_inputs
375    }
376
377    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
378        self.prompt.template = Some(template);
379    }
380
381    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
382        self.prompt.add_contribution(contribution);
383    }
384
385    pub fn replace_prompt_slot(
386        &mut self,
387        slot: crate::PromptSlot,
388        contributions: impl IntoIterator<Item = crate::PromptContribution>,
389    ) {
390        self.prompt.replace_slot(slot, contributions);
391    }
392
393    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
394        self.prompt.clear_slot(slot);
395    }
396
397    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
398        self.prompt = prompt;
399    }
400
401    pub fn prompt_layer(&self) -> &crate::PromptLayer {
402        &self.prompt
403    }
404}
405
406impl fmt::Debug for TurnContext {
407    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
408        f.debug_struct("TurnContext")
409            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
410            .field("has_provider", &self.provider.is_some())
411            .field("has_model", &self.model.is_some())
412            .field("has_prompt_layer", &(!self.prompt.is_empty()))
413            .finish()
414    }
415}
416
417#[derive(Clone)]
418pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
419
420impl ProtocolTurnExtensionHandle {
421    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
422        Self(Arc::new(extension))
423    }
424
425    pub fn as_any(&self) -> &dyn Any {
426        self.0.as_any()
427    }
428
429    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
430        self.0.prompt_contributions()
431    }
432}
433
434impl fmt::Debug for ProtocolTurnExtensionHandle {
435    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436        f.write_str("ProtocolTurnExtensionHandle(..)")
437    }
438}
439
440pub trait ProtocolTurnExtension: Send + Sync {
441    fn as_any(&self) -> &dyn Any;
442
443    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
444        Vec::new()
445    }
446}
447
448#[derive(Clone)]
449pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
450
451impl ProtocolSessionExtensionHandle {
452    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
453        Self(Arc::new(extension))
454    }
455
456    pub fn as_any(&self) -> &dyn Any {
457        self.0.as_any()
458    }
459}
460
461impl fmt::Debug for ProtocolSessionExtensionHandle {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463        f.write_str("ProtocolSessionExtensionHandle(..)")
464    }
465}
466
467pub trait ProtocolSessionExtension: Send + Sync {
468    fn as_any(&self) -> &dyn Any;
469}
470
471#[derive(Clone, Debug)]
472pub(super) enum NormalizedItem {
473    Text(String),
474    Image(crate::AttachmentRef),
475}
476
477/// Canonical assistant output payload.
478#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
479pub struct AssistantOutput {
480    pub safe_text: String,
481    pub raw_text: String,
482    pub state: OutputState,
483}
484
485/// Quality and usability of assembled terminal output.
486#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
487#[serde(rename_all = "snake_case")]
488pub enum OutputState {
489    Usable,
490    EmptyOutput,
491    TracebackOnly,
492    RecoveredFromError,
493}
494
495/// RLM code execution output observed during a turn.
496#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
497pub struct CodeOutputRecord {
498    pub output: String,
499    #[serde(default, skip_serializing_if = "Option::is_none")]
500    pub error: Option<String>,
501}
502
503/// High-level execution summary for a completed turn.
504#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
505pub struct ExecutionSummary {
506    #[serde(default)]
507    pub had_tool_calls: bool,
508    #[serde(default)]
509    pub had_code_execution: bool,
510}
511
512/// Structured issue surfaced during turn execution.
513#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
514pub struct TurnIssue {
515    pub kind: String,
516    #[serde(default, skip_serializing_if = "Option::is_none")]
517    pub code: Option<String>,
518    #[serde(default, skip_serializing_if = "Option::is_none")]
519    pub terminal_reason: Option<crate::LlmTerminalReason>,
520    pub message: String,
521    #[serde(default, skip_serializing_if = "Option::is_none")]
522    pub raw: Option<String>,
523}
524
525/// Canonical high-level turn result returned to hosts.
526#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
527pub struct AssembledTurn {
528    pub state: SessionSnapshot,
529    pub outcome: crate::TurnOutcome,
530    pub assistant_output: AssistantOutput,
531    pub execution: ExecutionSummary,
532    #[serde(default)]
533    pub token_usage: TokenUsage,
534    /// Per-(session, source, model) ledger entries for child sessions whose
535    /// LLM calls completed during this turn. `token_usage` above is the
536    /// parent's own LLM tokens; `total_usage` (on the embed-facing
537    /// `TurnResult`) sums both.
538    #[serde(default)]
539    pub children_usage: Vec<TokenLedgerEntry>,
540    #[serde(default)]
541    pub tool_calls: Vec<ToolCallRecord>,
542    #[serde(default)]
543    pub errors: Vec<TurnIssue>,
544}
545
546/// Result of driving one logical host turn through any AgentFrame switches.
547///
548/// A frame switch is an internal runtime continuation, similar to compaction
549/// from a host's perspective. Callers that need a final answer can use
550/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
551#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
552pub struct AgentFrameRun {
553    pub turns: Vec<AssembledTurn>,
554}
555
556impl AgentFrameRun {
557    pub fn final_turn(&self) -> Option<&AssembledTurn> {
558        self.turns.last()
559    }
560
561    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
562        self.turns.pop()
563    }
564
565    pub fn frame_switch_count(&self) -> usize {
566        self.turns
567            .iter()
568            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
569            .count()
570    }
571}
572
573/// Termination policy knobs.
574#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
575pub struct TerminationPolicy {
576    #[serde(default)]
577    pub treat_missing_done_as_failure: bool,
578}
579
580impl Default for TerminationPolicy {
581    fn default() -> Self {
582        Self {
583            treat_missing_done_as_failure: true,
584        }
585    }
586}
587
588/// Host event sink for low-level streaming runtime events.
589/// `SessionEvent` is protocol-specific preview/progress data.
590#[async_trait::async_trait]
591pub trait EventSink: Send + Sync {
592    fn is_noop(&self) -> bool {
593        false
594    }
595
596    async fn emit(&self, event: SessionEvent);
597}
598
599/// No-op sink useful for callers that only care about final state.
600pub struct NoopEventSink;
601
602/// Static no-op event sink for callers that need a `&dyn EventSink` default.
603pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
604
605#[async_trait::async_trait]
606impl EventSink for NoopEventSink {
607    fn is_noop(&self) -> bool {
608        true
609    }
610
611    async fn emit(&self, _event: SessionEvent) {}
612}
613
614/// Stable identifier for a semantic turn activity.
615#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
616#[serde(transparent)]
617pub struct TurnActivityId(pub String);
618
619impl TurnActivityId {
620    pub fn new(id: impl Into<String>) -> Self {
621        Self(id.into())
622    }
623
624    pub fn fresh() -> Self {
625        Self(uuid::Uuid::new_v4().to_string())
626    }
627}
628
629/// App-facing semantic activity emitted during a turn.
630///
631/// `id` is unique per emitted activity event. `correlation_id` groups related
632/// events in the same logical activity, such as code start/completion, tool
633/// start/completion, or text deltas from one output block.
634#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
635pub struct TurnActivity {
636    pub id: TurnActivityId,
637    pub correlation_id: TurnActivityId,
638    #[serde(flatten)]
639    pub event: TurnEvent,
640}
641
642impl TurnActivity {
643    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
644        Self {
645            id: TurnActivityId::fresh(),
646            correlation_id,
647            event,
648        }
649    }
650
651    pub fn independent(event: TurnEvent) -> Self {
652        let correlation_id = TurnActivityId::fresh();
653        Self::new(correlation_id, event)
654    }
655}
656
657/// App-facing semantic event payload for a turn activity.
658///
659/// Unlike [`SessionEvent`], these events are stable application signals rather
660/// than low-level runtime/debug events. Public streams carry these payloads
661/// inside [`TurnActivity`] so every emitted item has identity.
662#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
663#[serde(tag = "type", rename_all = "snake_case")]
664#[allow(clippy::large_enum_variant)]
665pub enum TurnEvent {
666    QueuedWorkStarted {
667        boundary: crate::QueuedWorkClaimBoundary,
668        batch_ids: Vec<String>,
669        causes: Vec<crate::TurnCause>,
670    },
671    ModelRequestStarted {
672        protocol_iteration: usize,
673    },
674    AssistantProseDelta {
675        text: String,
676    },
677    ReasoningDelta {
678        text: String,
679    },
680    CodeBlockStarted {
681        language: String,
682        code: String,
683        #[serde(default, skip_serializing_if = "Option::is_none")]
684        graph_key: Option<String>,
685    },
686    CodeBlockCompleted {
687        language: String,
688        output: String,
689        error: Option<String>,
690        success: bool,
691        duration_ms: u64,
692        tool_call_ids: Vec<String>,
693        #[serde(default, skip_serializing_if = "Option::is_none")]
694        graph_key: Option<String>,
695    },
696    ToolCallStarted {
697        call_id: Option<String>,
698        name: String,
699        args: serde_json::Value,
700    },
701    ToolCallCompleted {
702        call_id: Option<String>,
703        name: String,
704        args: serde_json::Value,
705        output: crate::ToolCallOutput,
706        duration_ms: u64,
707    },
708    SubmittedValue {
709        value: serde_json::Value,
710    },
711    ToolValue {
712        tool_name: String,
713        value: serde_json::Value,
714    },
715    Usage {
716        protocol_iteration: usize,
717        usage: TokenUsage,
718        cumulative: TokenUsage,
719    },
720    ChildUsage {
721        session_id: String,
722        source: String,
723        model: String,
724        protocol_iteration: usize,
725        usage: TokenUsage,
726        cumulative: TokenUsage,
727    },
728    RetryStatus {
729        wait_seconds: u64,
730        attempt: usize,
731        max_attempts: usize,
732        reason: String,
733    },
734    PluginRuntime {
735        plugin_id: String,
736        event: crate::PluginRuntimeEvent,
737    },
738    QueuedInputAccepted {
739        checkpoint: crate::CheckpointKind,
740        inputs: Vec<crate::AcceptedInjectedTurnInput>,
741    },
742    QueuedMessagesCommitted {
743        messages: Vec<crate::PluginMessage>,
744        checkpoint: crate::CheckpointKind,
745    },
746    Error {
747        message: String,
748    },
749}
750
751#[async_trait::async_trait]
752pub trait TurnActivitySink: Send + Sync {
753    fn is_noop(&self) -> bool {
754        false
755    }
756
757    async fn emit(&self, activity: TurnActivity);
758}
759
760pub struct NoopTurnActivitySink;
761
762/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
763pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
764
765#[async_trait::async_trait]
766impl TurnActivitySink for NoopTurnActivitySink {
767    fn is_noop(&self) -> bool {
768        true
769    }
770
771    async fn emit(&self, _activity: TurnActivity) {}
772}
773
774/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
775/// turn-driving entry points (`stream_turn`,
776/// `stream_turn_with_agent_frames`).
777///
778/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
779/// default to no-op sinks. Effect scope is explicit: callers that are already
780/// executing under an effect host must pass
781/// [`TurnOptions::with_scoped_effect_controller`]; host-local entry points
782/// construct a host-configured scope at their boundary.
783pub struct TurnOptions<'a> {
784    events: Option<&'a dyn EventSink>,
785    turn_events: Option<&'a dyn TurnActivitySink>,
786    scoped_effect_controller: Option<ScopedEffectController<'a>>,
787    cancel: CancellationToken,
788}
789
790impl<'a> TurnOptions<'a> {
791    pub fn new(cancel: CancellationToken) -> Self {
792        Self {
793            events: None,
794            turn_events: None,
795            scoped_effect_controller: None,
796            cancel,
797        }
798    }
799
800    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
801        self.events = Some(events);
802        self
803    }
804
805    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
806        self.turn_events = Some(turn_events);
807        self
808    }
809
810    pub fn with_scoped_effect_controller(
811        mut self,
812        scoped_effect_controller: ScopedEffectController<'a>,
813    ) -> Self {
814        self.scoped_effect_controller = Some(scoped_effect_controller);
815        self
816    }
817
818    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
819        self.events.unwrap_or(&NOOP_EVENT_SINK)
820    }
821
822    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
823        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
824    }
825
826    pub(crate) fn effect_scope_id(&self) -> Option<&str> {
827        self.scoped_effect_controller
828            .as_ref()
829            .map(ScopedEffectController::scope_id)
830    }
831
832    pub(crate) fn scoped_effect_controller(&self) -> Option<ScopedEffectController<'a>> {
833        self.scoped_effect_controller.clone()
834    }
835}
836
837enum RuntimeStreamEvent {
838    Session(SessionEvent),
839    Turn(TurnActivity),
840}
841
842#[derive(Clone)]
843pub struct SessionStoreCreateRequest {
844    pub session_id: String,
845    pub relation: SessionRelation,
846    pub policy: SessionPolicy,
847}
848
849impl SessionStoreCreateRequest {
850    pub fn parent_session_id(&self) -> Option<&str> {
851        self.relation.parent_session_id()
852    }
853}
854
855#[async_trait::async_trait]
856pub trait SessionStoreFactory: Send + Sync {
857    /// Durability tier the stores produced by this factory provide; defaults to
858    /// [`DurabilityTier::Inline`].
859    fn durability_tier(&self) -> crate::DurabilityTier {
860        crate::DurabilityTier::Inline
861    }
862
863    async fn create_store(
864        &self,
865        request: &SessionStoreCreateRequest,
866    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
867
868    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
869}
870
871/// Generic runtime for CLI or programmatic embedding.
872pub struct LashRuntime {
873    pub(in crate::runtime) session: Option<Session>,
874    pub(in crate::runtime) policy: SessionPolicy,
875    pub(in crate::runtime) host: RuntimeHost,
876    pub(in crate::runtime) services: RuntimeServices,
877    pub(in crate::runtime) state: RuntimeSessionState,
878    pub(in crate::runtime) runtime_scope_id: Arc<str>,
879    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
880    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
881    /// Protocol-owned turn options for this session.
882    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
883    /// Session-scoped token cost ledger. Shared by ALL
884    /// `RuntimeSessionServices` instances created from this runtime
885    /// (both per-turn and async maintenance). Entries accumulate here
886    /// and are drained into `state.token_ledger` at turn-commit time.
887    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
888    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
889    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
890    /// Resident-graph policy chosen by the host. Controls whether
891    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
892    /// graph or just the active path, matching the trimming behavior set at
893    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
894    pub(in crate::runtime) residency: Residency,
895}