Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod config_ops;
5mod effect;
6mod environment;
7mod error;
8mod host;
9mod in_memory_store;
10mod io;
11mod lifecycle;
12mod observation;
13mod process;
14mod process_work_runner;
15mod process_worker;
16mod queued_work_runner;
17mod session_api;
18mod session_manager;
19mod session_ops;
20mod state;
21#[cfg(test)]
22pub(crate) mod tests;
23mod turn_boundary;
24mod turn_commit_draft;
25mod turn_driver;
26mod turn_graph_editor;
27mod turn_loop;
28mod turn_queue;
29mod usage;
30
31use std::any::Any;
32use std::collections::HashMap;
33use std::fmt;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::atomic::{AtomicBool, Ordering};
37
38use tokio::sync::{Mutex, mpsc};
39use tokio_util::sync::CancellationToken;
40
41use crate::llm::types::{
42    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
43    LlmStreamEvent, LlmUsage,
44};
45use crate::plugin::{
46    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
47};
48use crate::sansio::{LlmCallError, Response};
49use crate::session_model::{
50    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
51    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
52    transport_stream_events,
53};
54use crate::{
55    CheckpointKind, PersistentRuntimeServices, PluginActionInvokeError, PromptHookContext,
56    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
57    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
58};
59use crate::{Effect, TurnMachine};
60
61use host::*;
62use session_manager::*;
63use turn_boundary::*;
64use turn_commit_draft::*;
65use turn_driver::*;
66
67pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 15 * 60 * 1000;
68
69// `PromptUsage` is re-exported below alongside the runtime's own types.
70pub use lash_sansio::PromptUsage;
71
72use assembly::{
73    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
74    LlmStreamState, LlmStreamSummary, TurnAssembler,
75};
76#[cfg(test)]
77#[allow(unused_imports)]
78use assembly::{classify_output_state, sanitize_assistant_output};
79pub use builder::EmbeddedRuntimeBuilder;
80pub use causal::process_event_invocation;
81pub(crate) use causal::tool_retry_sleep_invocation;
82pub(crate) use effect::RuntimeEffectControllerHandle;
83pub use effect::{
84    CausalRef, EffectHost, EffectScope, InlineEffectHost, InlineRuntimeEffectController,
85    LlmAttachmentSpec, LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand,
86    RuntimeEffectController, RuntimeEffectControllerError, RuntimeEffectEnvelope,
87    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
88    RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
89};
90pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
91pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
92pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
93pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
94use io::normalize_input_items;
95pub use observation::{
96    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
97    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
98    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
99    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
100    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
101    SessionRevision,
102};
103#[cfg(any(test, feature = "testing"))]
104pub use process::TestLocalProcessRegistry;
105pub use process::{
106    DefaultProcessCancelAbility, ObservedProcess, ObservedProcessEvent, ObservedWorkItem,
107    PROCESS_LEASE_SCHEMA_VERSION, PreparedProcessEventAppend, ProcessAwaitOutput,
108    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
109    ProcessCancelSummary, ProcessDefinitionSelector, ProcessDefinitionSummary, ProcessEvent,
110    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessEventSemantics,
111    ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext, ProcessExecutionEnvRef,
112    ProcessExecutionEnvSpec, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
113    ProcessHandleGrantEntry, ProcessHandleSummary, ProcessId, ProcessInput, ProcessLease,
114    ProcessLeaseCompletion, ProcessLifecycleStatus, ProcessListFilter, ProcessListMode,
115    ProcessOpScope, ProcessOriginator, ProcessProvenance, ProcessRecord, ProcessRegistration,
116    ProcessRegistry, ProcessService, ProcessSessionDeleteReport, ProcessStartGrant,
117    ProcessStartOptions, ProcessStartRequest, ProcessStatus, ProcessStatusFilter,
118    ProcessTerminalSemantics, ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector,
119    ProcessWake, ProcessWakeDedupeKey, ProcessWakeDelivery, ProcessWakeSpec, ProcessWorkObserver,
120    ProcessWorkSnapshot, SessionScope, SessionScopeId, UnavailableProcessService, WaitKind,
121    WaitState, current_epoch_ms, epoch_ms_from_system_time, lashlang_process_event_types,
122    lashlang_process_signal_event_types, load_process_execution_env,
123    materialize_process_event_semantics, persist_process_execution_env,
124    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
125    process_signal_event_type, process_signal_name_from_event_type, process_signal_wait_key,
126    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
127    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
128    validate_process_signal_name,
129};
130pub use process_work_runner::{
131    InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver, ProcessWorkPoke, ProcessWorkRunner,
132};
133pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
134pub use queued_work_runner::{
135    QueuedWorkPoke, QueuedWorkRunHandle, QueuedWorkRunOutcome, QueuedWorkRunRequest,
136    QueuedWorkRunner,
137};
138pub use session_manager::DirectCompletionClient;
139pub use state::RuntimeSessionState;
140use state::{
141    append_session_nodes_to_state, apply_residency_on_load, apply_session_checkpoint,
142    apply_session_head, normalize_session_graph, open_agent_frame_in_state,
143};
144pub use turn_loop::ensure_durable_effect_input;
145pub use turn_queue::{
146    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
147    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
148    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand, SessionCommandReceipt,
149    SlotPolicy, process_wake_batch_draft,
150};
151pub use usage::{
152    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
153    diff_usage_reports,
154};
155use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
156
157#[doc(hidden)]
158#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
159pub enum RuntimeTurnPhase {
160    ContextTransform,
161    BeforeTurnHooks,
162    PromptBuild,
163    EffectLoop,
164    FinalizeTurn,
165    PersistTurn,
166    FinalCommit,
167    PostPersistHooks,
168}
169
170#[doc(hidden)]
171pub trait RuntimeTurnPhaseProbe: Send + Sync {
172    fn begin(&self, phase: RuntimeTurnPhase);
173    fn end(&self, phase: RuntimeTurnPhase);
174    fn begin_named(&self, _phase: &str) {}
175    fn end_named(&self, _phase: &str) {}
176}
177
178/// Host-provided per-turn input.
179#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
180#[serde(tag = "type", rename_all = "snake_case")]
181pub enum InputItem {
182    Text { text: String },
183    ImageRef { id: String },
184}
185
186impl InputItem {
187    pub fn text(text: impl Into<String>) -> Self {
188        Self::Text { text: text.into() }
189    }
190
191    pub fn image_ref(id: impl Into<String>) -> Self {
192        Self::ImageRef { id: id.into() }
193    }
194}
195
196/// Host-provided per-turn input.
197#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
198pub struct TurnInput {
199    pub items: Vec<InputItem>,
200    #[serde(default)]
201    pub image_blobs: HashMap<String, Vec<u8>>,
202    /// Per-turn override for protocol-owned turn options.
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
205    /// Optional externally-stable trace turn id. Normal runtime callers leave
206    /// this empty and the runtime generates one per outer turn.
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub trace_turn_id: Option<String>,
209    #[serde(skip)]
210    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
211    #[serde(skip)]
212    pub turn_context: TurnContext,
213}
214
215impl TurnInput {
216    pub fn empty() -> Self {
217        Self::items(std::iter::empty())
218    }
219
220    pub fn text(text: impl Into<String>) -> Self {
221        Self::items([InputItem::text(text)])
222    }
223
224    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
225        Self {
226            items: items.into_iter().collect(),
227            image_blobs: HashMap::new(),
228            protocol_turn_options: None,
229            trace_turn_id: None,
230            protocol_extension: None,
231            turn_context: TurnContext::default(),
232        }
233    }
234
235    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
236        self.image_blobs.insert(id.into(), bytes);
237        self
238    }
239
240    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
241    where
242        I: IntoIterator<Item = (K, Vec<u8>)>,
243        K: Into<String>,
244    {
245        self.image_blobs.extend(
246            image_blobs
247                .into_iter()
248                .map(|(id, bytes)| (id.into(), bytes)),
249        );
250        self
251    }
252
253    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
254        let id = id.into();
255        self.items.push(InputItem::image_ref(id.clone()));
256        self.image_blobs.insert(id, bytes);
257        self
258    }
259
260    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
261        self.protocol_turn_options = Some(options);
262        self
263    }
264
265    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
266        self.trace_turn_id = Some(trace_turn_id.into());
267        self
268    }
269}
270
271/// Per-turn, in-process side channel of typed plugin inputs.
272///
273/// This is an `Any`-keyed map of live Rust values handed to plugins for a
274/// single turn. It is deliberately **not** serializable: the values never
275/// survive a process boundary, so durable effect-host runs explicitly reject a
276/// turn that carries any live inputs (see
277/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
278/// encode replayable data in
279/// `protocol_turn_options` or persisted plugin state.
280#[derive(Clone, Default)]
281pub struct LiveTurnInputs {
282    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
283}
284
285impl LiveTurnInputs {
286    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
287    where
288        T: Send + Sync + 'static,
289    {
290        self.inputs.insert(plugin_id, Arc::new(input));
291    }
292
293    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
294    where
295        T: 'static,
296    {
297        self.inputs
298            .get(plugin_id)
299            .and_then(|input| input.downcast_ref::<T>())
300    }
301
302    fn contains(&self, plugin_id: &'static str) -> bool {
303        self.inputs.contains_key(plugin_id)
304    }
305
306    fn is_empty(&self) -> bool {
307        self.inputs.is_empty()
308    }
309
310    pub fn plugin_ids(&self) -> Vec<&'static str> {
311        self.inputs.keys().copied().collect()
312    }
313
314    /// Returns an error when live per-turn inputs would make a durable effect
315    /// host replay depend on process-local values.
316    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
317        if self.is_empty() {
318            return Ok(());
319        }
320        Err(RuntimeError::new(
321            RuntimeErrorCode::DurableEffectLivePluginInput,
322            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
323        ))
324    }
325}
326
327#[derive(Clone, Default)]
328pub struct TurnContext {
329    plugin_inputs: LiveTurnInputs,
330    provider: Option<crate::ProviderHandle>,
331    model: Option<crate::ModelSpec>,
332    prompt: crate::PromptLayer,
333}
334
335impl TurnContext {
336    pub fn new() -> Self {
337        Self::default()
338    }
339
340    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
341    where
342        T: Send + Sync + 'static,
343    {
344        self.plugin_inputs.insert(plugin_id, input);
345    }
346
347    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
348        self.provider = Some(provider);
349    }
350
351    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
352        self.provider.as_ref()
353    }
354
355    pub fn set_model(&mut self, model: crate::ModelSpec) {
356        self.model = Some(model);
357    }
358
359    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
360        self.model.as_ref()
361    }
362
363    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
364    where
365        T: 'static,
366    {
367        self.plugin_inputs.get(plugin_id)
368    }
369
370    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
371        self.plugin_inputs.contains(plugin_id)
372    }
373
374    pub fn has_live_plugin_inputs(&self) -> bool {
375        !self.plugin_inputs.is_empty()
376    }
377
378    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
379        self.plugin_inputs.plugin_ids()
380    }
381
382    /// Live plugin inputs for this turn. The durable boundary inspects this to
383    /// reject turns carrying non-serializable live state.
384    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
385        &self.plugin_inputs
386    }
387
388    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
389        self.prompt.template = Some(template);
390    }
391
392    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
393        self.prompt.add_contribution(contribution);
394    }
395
396    pub fn replace_prompt_slot(
397        &mut self,
398        slot: crate::PromptSlot,
399        contributions: impl IntoIterator<Item = crate::PromptContribution>,
400    ) {
401        self.prompt.replace_slot(slot, contributions);
402    }
403
404    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
405        self.prompt.clear_slot(slot);
406    }
407
408    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
409        self.prompt = prompt;
410    }
411
412    pub fn prompt_layer(&self) -> &crate::PromptLayer {
413        &self.prompt
414    }
415}
416
417impl fmt::Debug for TurnContext {
418    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419        f.debug_struct("TurnContext")
420            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
421            .field("has_provider", &self.provider.is_some())
422            .field("has_model", &self.model.is_some())
423            .field("has_prompt_layer", &(!self.prompt.is_empty()))
424            .finish()
425    }
426}
427
428#[derive(Clone)]
429pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
430
431impl ProtocolTurnExtensionHandle {
432    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
433        Self(Arc::new(extension))
434    }
435
436    pub fn as_any(&self) -> &dyn Any {
437        self.0.as_any()
438    }
439
440    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
441        self.0.prompt_contributions()
442    }
443}
444
445impl fmt::Debug for ProtocolTurnExtensionHandle {
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        f.write_str("ProtocolTurnExtensionHandle(..)")
448    }
449}
450
451pub trait ProtocolTurnExtension: Send + Sync {
452    fn as_any(&self) -> &dyn Any;
453
454    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
455        Vec::new()
456    }
457}
458
459#[derive(Clone)]
460pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
461
462impl ProtocolSessionExtensionHandle {
463    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
464        Self(Arc::new(extension))
465    }
466
467    pub fn as_any(&self) -> &dyn Any {
468        self.0.as_any()
469    }
470}
471
472impl fmt::Debug for ProtocolSessionExtensionHandle {
473    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
474        f.write_str("ProtocolSessionExtensionHandle(..)")
475    }
476}
477
478pub trait ProtocolSessionExtension: Send + Sync {
479    fn as_any(&self) -> &dyn Any;
480}
481
482#[derive(Clone, Debug)]
483pub(super) enum NormalizedItem {
484    Text(String),
485    Image(crate::AttachmentRef),
486}
487
488/// Canonical assistant output payload.
489#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
490pub struct AssistantOutput {
491    pub safe_text: String,
492    pub raw_text: String,
493    pub state: OutputState,
494}
495
496/// Quality and usability of assembled terminal output.
497#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
498#[serde(rename_all = "snake_case")]
499pub enum OutputState {
500    Usable,
501    EmptyOutput,
502    TracebackOnly,
503    RecoveredFromError,
504}
505
506/// RLM code execution output observed during a turn.
507#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
508pub struct CodeOutputRecord {
509    pub output: String,
510    #[serde(default, skip_serializing_if = "Option::is_none")]
511    pub error: Option<String>,
512}
513
514/// High-level execution summary for a completed turn.
515#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
516pub struct ExecutionSummary {
517    #[serde(default)]
518    pub had_tool_calls: bool,
519    #[serde(default)]
520    pub had_code_execution: bool,
521}
522
523/// Structured issue surfaced during turn execution.
524#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
525pub struct TurnIssue {
526    pub kind: String,
527    #[serde(default, skip_serializing_if = "Option::is_none")]
528    pub code: Option<String>,
529    #[serde(default, skip_serializing_if = "Option::is_none")]
530    pub terminal_reason: Option<crate::LlmTerminalReason>,
531    pub message: String,
532    #[serde(default, skip_serializing_if = "Option::is_none")]
533    pub raw: Option<String>,
534}
535
536/// Canonical high-level turn result returned to hosts.
537#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
538pub struct AssembledTurn {
539    pub state: SessionSnapshot,
540    pub outcome: crate::TurnOutcome,
541    pub assistant_output: AssistantOutput,
542    pub execution: ExecutionSummary,
543    #[serde(default)]
544    pub token_usage: TokenUsage,
545    /// Per-(session, source, model) ledger entries for child sessions whose
546    /// LLM calls completed during this turn. `token_usage` above is the
547    /// parent's own LLM tokens; `total_usage` (on the embed-facing
548    /// `TurnResult`) sums both.
549    #[serde(default)]
550    pub children_usage: Vec<TokenLedgerEntry>,
551    #[serde(default)]
552    pub tool_calls: Vec<ToolCallRecord>,
553    #[serde(default)]
554    pub errors: Vec<TurnIssue>,
555}
556
557/// Result of driving one logical host turn through any AgentFrame switches.
558///
559/// A frame switch is an internal runtime continuation, similar to compaction
560/// from a host's perspective. Callers that need a final answer can use
561/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
562#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
563pub struct AgentFrameRun {
564    pub turns: Vec<AssembledTurn>,
565}
566
567impl AgentFrameRun {
568    pub fn final_turn(&self) -> Option<&AssembledTurn> {
569        self.turns.last()
570    }
571
572    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
573        self.turns.pop()
574    }
575
576    pub fn frame_switch_count(&self) -> usize {
577        self.turns
578            .iter()
579            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
580            .count()
581    }
582}
583
584/// Termination policy knobs.
585#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
586pub struct TerminationPolicy {
587    #[serde(default)]
588    pub treat_missing_done_as_failure: bool,
589}
590
591impl Default for TerminationPolicy {
592    fn default() -> Self {
593        Self {
594            treat_missing_done_as_failure: true,
595        }
596    }
597}
598
599/// Host event sink for low-level streaming runtime events.
600/// `SessionEvent` is protocol-specific preview/progress data.
601#[async_trait::async_trait]
602pub trait EventSink: Send + Sync {
603    fn is_noop(&self) -> bool {
604        false
605    }
606
607    async fn emit(&self, event: SessionEvent);
608}
609
610/// No-op sink useful for callers that only care about final state.
611pub struct NoopEventSink;
612
613/// Static no-op event sink for callers that need a `&dyn EventSink` default.
614pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
615
616#[async_trait::async_trait]
617impl EventSink for NoopEventSink {
618    fn is_noop(&self) -> bool {
619        true
620    }
621
622    async fn emit(&self, _event: SessionEvent) {}
623}
624
625/// Stable identifier for a semantic turn activity.
626#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
627#[serde(transparent)]
628pub struct TurnActivityId(pub String);
629
630impl TurnActivityId {
631    pub fn new(id: impl Into<String>) -> Self {
632        Self(id.into())
633    }
634
635    pub fn fresh() -> Self {
636        Self(uuid::Uuid::new_v4().to_string())
637    }
638}
639
640/// App-facing semantic activity emitted during a turn.
641///
642/// `id` is unique per emitted activity event. `correlation_id` groups related
643/// events in the same logical activity, such as code start/completion, tool
644/// start/completion, or text deltas from one output block.
645#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
646pub struct TurnActivity {
647    pub id: TurnActivityId,
648    pub correlation_id: TurnActivityId,
649    #[serde(flatten)]
650    pub event: TurnEvent,
651}
652
653impl TurnActivity {
654    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
655        Self {
656            id: TurnActivityId::fresh(),
657            correlation_id,
658            event,
659        }
660    }
661
662    pub fn independent(event: TurnEvent) -> Self {
663        let correlation_id = TurnActivityId::fresh();
664        Self::new(correlation_id, event)
665    }
666}
667
668/// App-facing semantic event payload for a turn activity.
669///
670/// Unlike [`SessionEvent`], these events are stable application signals rather
671/// than low-level runtime/debug events. Public streams carry these payloads
672/// inside [`TurnActivity`] so every emitted item has identity.
673#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
674#[serde(tag = "type", rename_all = "snake_case")]
675#[allow(clippy::large_enum_variant)]
676pub enum TurnEvent {
677    QueuedWorkStarted {
678        boundary: crate::QueuedWorkClaimBoundary,
679        batch_ids: Vec<String>,
680        causes: Vec<crate::TurnCause>,
681    },
682    ModelRequestStarted {
683        protocol_iteration: usize,
684    },
685    AssistantProseDelta {
686        text: String,
687    },
688    ReasoningDelta {
689        text: String,
690    },
691    CodeBlockStarted {
692        language: String,
693        code: String,
694        #[serde(default, skip_serializing_if = "Option::is_none")]
695        graph_key: Option<String>,
696    },
697    CodeBlockCompleted {
698        language: String,
699        output: String,
700        error: Option<String>,
701        success: bool,
702        duration_ms: u64,
703        tool_call_ids: Vec<String>,
704        #[serde(default, skip_serializing_if = "Option::is_none")]
705        graph_key: Option<String>,
706    },
707    ToolCallStarted {
708        call_id: Option<String>,
709        name: String,
710        args: serde_json::Value,
711    },
712    ToolCallCompleted {
713        call_id: Option<String>,
714        name: String,
715        args: serde_json::Value,
716        output: crate::ToolCallOutput,
717        duration_ms: u64,
718    },
719    SubmittedValue {
720        value: serde_json::Value,
721    },
722    ToolValue {
723        tool_name: String,
724        value: serde_json::Value,
725    },
726    Usage {
727        protocol_iteration: usize,
728        usage: TokenUsage,
729        cumulative: TokenUsage,
730    },
731    ChildUsage {
732        session_id: String,
733        source: String,
734        model: String,
735        protocol_iteration: usize,
736        usage: TokenUsage,
737        cumulative: TokenUsage,
738    },
739    RetryStatus {
740        wait_seconds: u64,
741        attempt: usize,
742        max_attempts: usize,
743        reason: String,
744    },
745    PluginRuntime {
746        plugin_id: String,
747        event: crate::PluginRuntimeEvent,
748    },
749    QueuedInputAccepted {
750        checkpoint: crate::CheckpointKind,
751        inputs: Vec<crate::AcceptedInjectedTurnInput>,
752    },
753    QueuedMessagesCommitted {
754        messages: Vec<crate::PluginMessage>,
755        checkpoint: crate::CheckpointKind,
756    },
757    Error {
758        message: String,
759    },
760}
761
762#[async_trait::async_trait]
763pub trait TurnActivitySink: Send + Sync {
764    fn is_noop(&self) -> bool {
765        false
766    }
767
768    async fn emit(&self, activity: TurnActivity);
769}
770
771pub struct NoopTurnActivitySink;
772
773/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
774pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
775
776#[async_trait::async_trait]
777impl TurnActivitySink for NoopTurnActivitySink {
778    fn is_noop(&self) -> bool {
779        true
780    }
781
782    async fn emit(&self, _activity: TurnActivity) {}
783}
784
785/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
786/// turn-driving entry points (`stream_turn`,
787/// `stream_turn_with_agent_frames`).
788///
789/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
790/// default to no-op sinks. Effect scope is explicit and required at every
791/// runtime boundary that can execute nondeterministic work.
792pub struct TurnOptions<'a> {
793    events: Option<&'a dyn EventSink>,
794    turn_events: Option<&'a dyn TurnActivitySink>,
795    scoped_effect_controller: ScopedEffectController<'a>,
796    cancel: CancellationToken,
797}
798
799impl<'a> TurnOptions<'a> {
800    pub fn new(
801        cancel: CancellationToken,
802        scoped_effect_controller: ScopedEffectController<'a>,
803    ) -> Self {
804        Self {
805            events: None,
806            turn_events: None,
807            scoped_effect_controller,
808            cancel,
809        }
810    }
811
812    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
813        self.events = Some(events);
814        self
815    }
816
817    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
818        self.turn_events = Some(turn_events);
819        self
820    }
821
822    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
823        self.events.unwrap_or(&NOOP_EVENT_SINK)
824    }
825
826    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
827        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
828    }
829
830    pub(crate) fn effect_scope_id(&self) -> &str {
831        self.scoped_effect_controller.scope_id()
832    }
833
834    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
835        self.scoped_effect_controller.clone()
836    }
837}
838
839enum RuntimeStreamEvent {
840    Session(SessionEvent),
841    Turn(TurnActivity),
842}
843
844#[derive(Clone)]
845pub struct SessionStoreCreateRequest {
846    pub session_id: String,
847    pub relation: SessionRelation,
848    pub policy: SessionPolicy,
849}
850
851impl SessionStoreCreateRequest {
852    pub fn parent_session_id(&self) -> Option<&str> {
853        self.relation.parent_session_id()
854    }
855}
856
857#[async_trait::async_trait]
858pub trait SessionStoreFactory: Send + Sync {
859    /// Durability tier the stores produced by this factory provide; defaults to
860    /// [`DurabilityTier::Inline`].
861    fn durability_tier(&self) -> crate::DurabilityTier {
862        crate::DurabilityTier::Inline
863    }
864
865    async fn create_store(
866        &self,
867        request: &SessionStoreCreateRequest,
868    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
869
870    async fn open_existing_store(
871        &self,
872        _request: &SessionStoreCreateRequest,
873    ) -> Result<Option<Arc<dyn crate::store::RuntimePersistence>>, String> {
874        Ok(None)
875    }
876
877    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
878}
879
880/// Generic runtime for CLI or programmatic embedding.
881pub struct LashRuntime {
882    pub(in crate::runtime) session: Option<Session>,
883    pub(in crate::runtime) policy: SessionPolicy,
884    pub(in crate::runtime) host: RuntimeHost,
885    pub(in crate::runtime) services: RuntimeServices,
886    pub(in crate::runtime) state: RuntimeSessionState,
887    pub(in crate::runtime) runtime_scope_id: Arc<str>,
888    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
889    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
890    /// Protocol-owned turn options for this session.
891    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
892    /// Session-scoped token cost ledger. Shared by ALL
893    /// `RuntimeSessionServices` instances created from this runtime
894    /// (both per-turn and async maintenance). Entries accumulate here
895    /// and are drained into `state.token_ledger` at turn-commit time.
896    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
897    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
898    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
899    /// Resident-graph policy chosen by the host. Controls whether
900    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
901    /// graph or just the active path, matching the trimming behavior set at
902    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
903    pub(in crate::runtime) residency: Residency,
904}