Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod config_ops;
5mod effect;
6mod environment;
7mod error;
8mod host;
9mod in_memory_store;
10mod io;
11mod lifecycle;
12mod observation;
13mod process;
14mod process_work_runner;
15mod process_worker;
16mod queued_work_runner;
17mod session_api;
18mod session_manager;
19mod session_ops;
20mod state;
21#[cfg(test)]
22pub(crate) mod tests;
23mod turn_boundary;
24mod turn_commit_draft;
25mod turn_driver;
26mod turn_graph_editor;
27mod turn_loop;
28mod turn_queue;
29mod usage;
30
31use std::any::Any;
32use std::collections::HashMap;
33use std::fmt;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::atomic::{AtomicBool, Ordering};
37
38use tokio::sync::{Mutex, mpsc};
39use tokio_util::sync::CancellationToken;
40
41use crate::llm::types::{
42    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
43    LlmStreamEvent, LlmUsage,
44};
45use crate::plugin::{
46    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
47};
48use crate::sansio::{LlmCallError, Response};
49use crate::session_model::{
50    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
51    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
52    transport_stream_events,
53};
54use crate::{
55    CheckpointKind, PersistentRuntimeServices, PluginActionInvokeError, PromptHookContext,
56    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
57    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
58};
59use crate::{Effect, TurnMachine};
60
61use host::*;
62use session_manager::*;
63use turn_boundary::*;
64use turn_commit_draft::*;
65use turn_driver::*;
66
67pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 15 * 60 * 1000;
68
69// `PromptUsage` is re-exported below alongside the runtime's own types.
70pub use lash_sansio::PromptUsage;
71
72use assembly::{
73    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
74    LlmStreamState, LlmStreamSummary, TurnAssembler,
75};
76#[cfg(test)]
77#[allow(unused_imports)]
78use assembly::{classify_output_state, sanitize_assistant_output};
79pub use builder::EmbeddedRuntimeBuilder;
80pub use causal::process_event_invocation;
81pub(crate) use causal::tool_retry_sleep_invocation;
82pub(crate) use effect::RuntimeEffectControllerHandle;
83pub use effect::{
84    CausalRef, EffectHost, EffectScope, InlineEffectHost, InlineRuntimeEffectController,
85    LlmAttachmentSpec, LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand,
86    RuntimeEffectController, RuntimeEffectControllerError, RuntimeEffectEnvelope,
87    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
88    RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
89};
90pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
91pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
92pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
93pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
94use io::normalize_input_items;
95pub use observation::{
96    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
97    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
98    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
99    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
100    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
101    SessionRevision,
102};
103#[cfg(any(test, feature = "testing"))]
104pub use process::TestLocalProcessRegistry;
105pub use process::{
106    DefaultProcessCancelAbility, ObservedProcess, ObservedProcessEvent, ObservedWorkItem,
107    PROCESS_LEASE_SCHEMA_VERSION, PreparedProcessEventAppend, ProcessAwaitOutput,
108    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
109    ProcessCancelSummary, ProcessDefinitionSelector, ProcessDefinitionSummary, ProcessEvent,
110    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessEventSemantics,
111    ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext, ProcessExecutionEnvRef,
112    ProcessExecutionEnvSpec, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
113    ProcessHandleGrantEntry, ProcessHandleSummary, ProcessId, ProcessInput, ProcessLease,
114    ProcessLeaseCompletion, ProcessLifecycleStatus, ProcessListFilter, ProcessListMode,
115    ProcessOpScope, ProcessOriginator, ProcessProvenance, ProcessRecord, ProcessRegistration,
116    ProcessRegistry, ProcessService, ProcessSessionDeleteReport, ProcessStartGrant,
117    ProcessSpawnProvenance, ProcessStartOptions, ProcessStartRequest, ProcessStatus,
118    ProcessStatusFilter,
119    ProcessTerminalSemantics, ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector,
120    ProcessWake, ProcessWakeDedupeKey, ProcessWakeDelivery, ProcessWakeSpec, ProcessWorkObserver,
121    ProcessWorkSnapshot, SessionScope, SessionScopeId, UnavailableProcessService, WaitKind,
122    WaitState, current_epoch_ms, epoch_ms_from_system_time, lashlang_process_event_types,
123    lashlang_process_signal_event_types, load_process_execution_env,
124    materialize_process_event_semantics, persist_process_execution_env,
125    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
126    process_signal_event_type, process_signal_name_from_event_type, process_signal_wait_key,
127    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
128    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
129    validate_process_signal_name,
130};
131pub use process_work_runner::{
132    InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver, ProcessWorkPoke, ProcessWorkRunner,
133};
134pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
135pub use queued_work_runner::{
136    QueuedWorkPoke, QueuedWorkRunHandle, QueuedWorkRunOutcome, QueuedWorkRunRequest,
137    QueuedWorkRunner,
138};
139pub use session_manager::DirectCompletionClient;
140pub use state::RuntimeSessionState;
141use state::{
142    append_session_nodes_to_state, apply_residency_on_load, apply_session_checkpoint,
143    apply_session_head, normalize_session_graph, open_agent_frame_in_state,
144};
145pub use turn_loop::ensure_durable_effect_input;
146pub use turn_queue::{
147    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
148    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
149    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand, SessionCommandReceipt,
150    SlotPolicy, process_wake_batch_draft,
151};
152pub use usage::{
153    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
154    diff_usage_reports,
155};
156use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
157
158#[doc(hidden)]
159#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
160pub enum RuntimeTurnPhase {
161    ContextTransform,
162    BeforeTurnHooks,
163    PromptBuild,
164    EffectLoop,
165    FinalizeTurn,
166    PersistTurn,
167    FinalCommit,
168    PostPersistHooks,
169}
170
171#[doc(hidden)]
172pub trait RuntimeTurnPhaseProbe: Send + Sync {
173    fn begin(&self, phase: RuntimeTurnPhase);
174    fn end(&self, phase: RuntimeTurnPhase);
175    fn begin_named(&self, _phase: &str) {}
176    fn end_named(&self, _phase: &str) {}
177}
178
179/// Host-provided per-turn input.
180#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
181#[serde(tag = "type", rename_all = "snake_case")]
182pub enum InputItem {
183    Text { text: String },
184    ImageRef { id: String },
185}
186
187impl InputItem {
188    pub fn text(text: impl Into<String>) -> Self {
189        Self::Text { text: text.into() }
190    }
191
192    pub fn image_ref(id: impl Into<String>) -> Self {
193        Self::ImageRef { id: id.into() }
194    }
195}
196
197/// Host-provided per-turn input.
198#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
199pub struct TurnInput {
200    pub items: Vec<InputItem>,
201    #[serde(default)]
202    pub image_blobs: HashMap<String, Vec<u8>>,
203    /// Per-turn override for protocol-owned turn options.
204    #[serde(default, skip_serializing_if = "Option::is_none")]
205    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
206    /// Optional externally-stable trace turn id. Normal runtime callers leave
207    /// this empty and the runtime generates one per outer turn.
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub trace_turn_id: Option<String>,
210    #[serde(skip)]
211    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
212    #[serde(skip)]
213    pub turn_context: TurnContext,
214}
215
216impl TurnInput {
217    pub fn empty() -> Self {
218        Self::items(std::iter::empty())
219    }
220
221    pub fn text(text: impl Into<String>) -> Self {
222        Self::items([InputItem::text(text)])
223    }
224
225    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
226        Self {
227            items: items.into_iter().collect(),
228            image_blobs: HashMap::new(),
229            protocol_turn_options: None,
230            trace_turn_id: None,
231            protocol_extension: None,
232            turn_context: TurnContext::default(),
233        }
234    }
235
236    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
237        self.image_blobs.insert(id.into(), bytes);
238        self
239    }
240
241    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
242    where
243        I: IntoIterator<Item = (K, Vec<u8>)>,
244        K: Into<String>,
245    {
246        self.image_blobs.extend(
247            image_blobs
248                .into_iter()
249                .map(|(id, bytes)| (id.into(), bytes)),
250        );
251        self
252    }
253
254    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
255        let id = id.into();
256        self.items.push(InputItem::image_ref(id.clone()));
257        self.image_blobs.insert(id, bytes);
258        self
259    }
260
261    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
262        self.protocol_turn_options = Some(options);
263        self
264    }
265
266    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
267        self.trace_turn_id = Some(trace_turn_id.into());
268        self
269    }
270}
271
272/// Per-turn, in-process side channel of typed plugin inputs.
273///
274/// This is an `Any`-keyed map of live Rust values handed to plugins for a
275/// single turn. It is deliberately **not** serializable: the values never
276/// survive a process boundary, so durable effect-host runs explicitly reject a
277/// turn that carries any live inputs (see
278/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
279/// encode replayable data in
280/// `protocol_turn_options` or persisted plugin state.
281#[derive(Clone, Default)]
282pub struct LiveTurnInputs {
283    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
284}
285
286impl LiveTurnInputs {
287    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
288    where
289        T: Send + Sync + 'static,
290    {
291        self.inputs.insert(plugin_id, Arc::new(input));
292    }
293
294    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
295    where
296        T: 'static,
297    {
298        self.inputs
299            .get(plugin_id)
300            .and_then(|input| input.downcast_ref::<T>())
301    }
302
303    fn contains(&self, plugin_id: &'static str) -> bool {
304        self.inputs.contains_key(plugin_id)
305    }
306
307    fn is_empty(&self) -> bool {
308        self.inputs.is_empty()
309    }
310
311    pub fn plugin_ids(&self) -> Vec<&'static str> {
312        self.inputs.keys().copied().collect()
313    }
314
315    /// Returns an error when live per-turn inputs would make a durable effect
316    /// host replay depend on process-local values.
317    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
318        if self.is_empty() {
319            return Ok(());
320        }
321        Err(RuntimeError::new(
322            RuntimeErrorCode::DurableEffectLivePluginInput,
323            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
324        ))
325    }
326}
327
328#[derive(Clone, Default)]
329pub struct TurnContext {
330    plugin_inputs: LiveTurnInputs,
331    provider: Option<crate::ProviderHandle>,
332    model: Option<crate::ModelSpec>,
333    prompt: crate::PromptLayer,
334}
335
336impl TurnContext {
337    pub fn new() -> Self {
338        Self::default()
339    }
340
341    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
342    where
343        T: Send + Sync + 'static,
344    {
345        self.plugin_inputs.insert(plugin_id, input);
346    }
347
348    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
349        self.provider = Some(provider);
350    }
351
352    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
353        self.provider.as_ref()
354    }
355
356    pub fn set_model(&mut self, model: crate::ModelSpec) {
357        self.model = Some(model);
358    }
359
360    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
361        self.model.as_ref()
362    }
363
364    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
365    where
366        T: 'static,
367    {
368        self.plugin_inputs.get(plugin_id)
369    }
370
371    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
372        self.plugin_inputs.contains(plugin_id)
373    }
374
375    pub fn has_live_plugin_inputs(&self) -> bool {
376        !self.plugin_inputs.is_empty()
377    }
378
379    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
380        self.plugin_inputs.plugin_ids()
381    }
382
383    /// Live plugin inputs for this turn. The durable boundary inspects this to
384    /// reject turns carrying non-serializable live state.
385    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
386        &self.plugin_inputs
387    }
388
389    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
390        self.prompt.template = Some(template);
391    }
392
393    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
394        self.prompt.add_contribution(contribution);
395    }
396
397    pub fn replace_prompt_slot(
398        &mut self,
399        slot: crate::PromptSlot,
400        contributions: impl IntoIterator<Item = crate::PromptContribution>,
401    ) {
402        self.prompt.replace_slot(slot, contributions);
403    }
404
405    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
406        self.prompt.clear_slot(slot);
407    }
408
409    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
410        self.prompt = prompt;
411    }
412
413    pub fn prompt_layer(&self) -> &crate::PromptLayer {
414        &self.prompt
415    }
416}
417
418impl fmt::Debug for TurnContext {
419    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
420        f.debug_struct("TurnContext")
421            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
422            .field("has_provider", &self.provider.is_some())
423            .field("has_model", &self.model.is_some())
424            .field("has_prompt_layer", &(!self.prompt.is_empty()))
425            .finish()
426    }
427}
428
429#[derive(Clone)]
430pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
431
432impl ProtocolTurnExtensionHandle {
433    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
434        Self(Arc::new(extension))
435    }
436
437    pub fn as_any(&self) -> &dyn Any {
438        self.0.as_any()
439    }
440
441    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
442        self.0.prompt_contributions()
443    }
444}
445
446impl fmt::Debug for ProtocolTurnExtensionHandle {
447    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448        f.write_str("ProtocolTurnExtensionHandle(..)")
449    }
450}
451
452pub trait ProtocolTurnExtension: Send + Sync {
453    fn as_any(&self) -> &dyn Any;
454
455    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
456        Vec::new()
457    }
458}
459
460#[derive(Clone)]
461pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
462
463impl ProtocolSessionExtensionHandle {
464    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
465        Self(Arc::new(extension))
466    }
467
468    pub fn as_any(&self) -> &dyn Any {
469        self.0.as_any()
470    }
471}
472
473impl fmt::Debug for ProtocolSessionExtensionHandle {
474    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475        f.write_str("ProtocolSessionExtensionHandle(..)")
476    }
477}
478
479pub trait ProtocolSessionExtension: Send + Sync {
480    fn as_any(&self) -> &dyn Any;
481}
482
483#[derive(Clone, Debug)]
484pub(super) enum NormalizedItem {
485    Text(String),
486    Image(crate::AttachmentRef),
487}
488
489/// Canonical assistant output payload.
490#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
491pub struct AssistantOutput {
492    pub safe_text: String,
493    pub raw_text: String,
494    pub state: OutputState,
495}
496
497/// Quality and usability of assembled terminal output.
498#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
499#[serde(rename_all = "snake_case")]
500pub enum OutputState {
501    Usable,
502    EmptyOutput,
503    TracebackOnly,
504    RecoveredFromError,
505}
506
507/// RLM code execution output observed during a turn.
508#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
509pub struct CodeOutputRecord {
510    pub output: String,
511    #[serde(default, skip_serializing_if = "Option::is_none")]
512    pub error: Option<String>,
513}
514
515/// High-level execution summary for a completed turn.
516#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
517pub struct ExecutionSummary {
518    #[serde(default)]
519    pub had_tool_calls: bool,
520    #[serde(default)]
521    pub had_code_execution: bool,
522}
523
524/// Structured issue surfaced during turn execution.
525#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
526pub struct TurnIssue {
527    pub kind: String,
528    #[serde(default, skip_serializing_if = "Option::is_none")]
529    pub code: Option<String>,
530    #[serde(default, skip_serializing_if = "Option::is_none")]
531    pub terminal_reason: Option<crate::LlmTerminalReason>,
532    pub message: String,
533    #[serde(default, skip_serializing_if = "Option::is_none")]
534    pub raw: Option<String>,
535}
536
537/// Canonical high-level turn result returned to hosts.
538#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
539pub struct AssembledTurn {
540    pub state: SessionSnapshot,
541    pub outcome: crate::TurnOutcome,
542    pub assistant_output: AssistantOutput,
543    pub execution: ExecutionSummary,
544    #[serde(default)]
545    pub token_usage: TokenUsage,
546    /// Per-(session, source, model) ledger entries for child sessions whose
547    /// LLM calls completed during this turn. `token_usage` above is the
548    /// parent's own LLM tokens; `total_usage` (on the embed-facing
549    /// `TurnResult`) sums both.
550    #[serde(default)]
551    pub children_usage: Vec<TokenLedgerEntry>,
552    #[serde(default)]
553    pub tool_calls: Vec<ToolCallRecord>,
554    #[serde(default)]
555    pub errors: Vec<TurnIssue>,
556}
557
558/// Result of driving one logical host turn through any AgentFrame switches.
559///
560/// A frame switch is an internal runtime continuation, similar to compaction
561/// from a host's perspective. Callers that need a final answer can use
562/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
563#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
564pub struct AgentFrameRun {
565    pub turns: Vec<AssembledTurn>,
566}
567
568impl AgentFrameRun {
569    pub fn final_turn(&self) -> Option<&AssembledTurn> {
570        self.turns.last()
571    }
572
573    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
574        self.turns.pop()
575    }
576
577    pub fn frame_switch_count(&self) -> usize {
578        self.turns
579            .iter()
580            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
581            .count()
582    }
583}
584
585/// Termination policy knobs.
586#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
587pub struct TerminationPolicy {
588    #[serde(default)]
589    pub treat_missing_done_as_failure: bool,
590}
591
592impl Default for TerminationPolicy {
593    fn default() -> Self {
594        Self {
595            treat_missing_done_as_failure: true,
596        }
597    }
598}
599
600/// Host event sink for low-level streaming runtime events.
601/// `SessionEvent` is protocol-specific preview/progress data.
602#[async_trait::async_trait]
603pub trait EventSink: Send + Sync {
604    fn is_noop(&self) -> bool {
605        false
606    }
607
608    async fn emit(&self, event: SessionEvent);
609}
610
611/// No-op sink useful for callers that only care about final state.
612pub struct NoopEventSink;
613
614/// Static no-op event sink for callers that need a `&dyn EventSink` default.
615pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
616
617#[async_trait::async_trait]
618impl EventSink for NoopEventSink {
619    fn is_noop(&self) -> bool {
620        true
621    }
622
623    async fn emit(&self, _event: SessionEvent) {}
624}
625
626/// Stable identifier for a semantic turn activity.
627#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
628#[serde(transparent)]
629pub struct TurnActivityId(pub String);
630
631impl TurnActivityId {
632    pub fn new(id: impl Into<String>) -> Self {
633        Self(id.into())
634    }
635
636    pub fn fresh() -> Self {
637        Self(uuid::Uuid::new_v4().to_string())
638    }
639}
640
641/// App-facing semantic activity emitted during a turn.
642///
643/// `id` is unique per emitted activity event. `correlation_id` groups related
644/// events in the same logical activity, such as code start/completion, tool
645/// start/completion, or text deltas from one output block.
646#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
647pub struct TurnActivity {
648    pub id: TurnActivityId,
649    pub correlation_id: TurnActivityId,
650    #[serde(flatten)]
651    pub event: TurnEvent,
652}
653
654impl TurnActivity {
655    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
656        Self {
657            id: TurnActivityId::fresh(),
658            correlation_id,
659            event,
660        }
661    }
662
663    pub fn independent(event: TurnEvent) -> Self {
664        let correlation_id = TurnActivityId::fresh();
665        Self::new(correlation_id, event)
666    }
667}
668
669/// App-facing semantic event payload for a turn activity.
670///
671/// Unlike [`SessionEvent`], these events are stable application signals rather
672/// than low-level runtime/debug events. Public streams carry these payloads
673/// inside [`TurnActivity`] so every emitted item has identity.
674#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
675#[serde(tag = "type", rename_all = "snake_case")]
676#[allow(clippy::large_enum_variant)]
677pub enum TurnEvent {
678    QueuedWorkStarted {
679        boundary: crate::QueuedWorkClaimBoundary,
680        batch_ids: Vec<String>,
681        causes: Vec<crate::TurnCause>,
682    },
683    ModelRequestStarted {
684        protocol_iteration: usize,
685    },
686    AssistantProseDelta {
687        text: String,
688    },
689    ReasoningDelta {
690        text: String,
691    },
692    CodeBlockStarted {
693        language: String,
694        code: String,
695        #[serde(default, skip_serializing_if = "Option::is_none")]
696        graph_key: Option<String>,
697    },
698    CodeBlockCompleted {
699        language: String,
700        output: String,
701        error: Option<String>,
702        success: bool,
703        duration_ms: u64,
704        tool_call_ids: Vec<String>,
705        #[serde(default, skip_serializing_if = "Option::is_none")]
706        graph_key: Option<String>,
707    },
708    ToolCallStarted {
709        call_id: Option<String>,
710        name: String,
711        args: serde_json::Value,
712    },
713    ToolCallCompleted {
714        call_id: Option<String>,
715        name: String,
716        args: serde_json::Value,
717        output: crate::ToolCallOutput,
718        duration_ms: u64,
719    },
720    SubmittedValue {
721        value: serde_json::Value,
722    },
723    ToolValue {
724        tool_name: String,
725        value: serde_json::Value,
726    },
727    Usage {
728        protocol_iteration: usize,
729        usage: TokenUsage,
730        cumulative: TokenUsage,
731    },
732    ChildUsage {
733        session_id: String,
734        source: String,
735        model: String,
736        protocol_iteration: usize,
737        usage: TokenUsage,
738        cumulative: TokenUsage,
739    },
740    RetryStatus {
741        wait_seconds: u64,
742        attempt: usize,
743        max_attempts: usize,
744        reason: String,
745    },
746    PluginRuntime {
747        plugin_id: String,
748        event: crate::PluginRuntimeEvent,
749    },
750    QueuedInputAccepted {
751        checkpoint: crate::CheckpointKind,
752        inputs: Vec<crate::AcceptedInjectedTurnInput>,
753    },
754    QueuedMessagesCommitted {
755        messages: Vec<crate::PluginMessage>,
756        checkpoint: crate::CheckpointKind,
757    },
758    Error {
759        message: String,
760    },
761}
762
763#[async_trait::async_trait]
764pub trait TurnActivitySink: Send + Sync {
765    fn is_noop(&self) -> bool {
766        false
767    }
768
769    async fn emit(&self, activity: TurnActivity);
770}
771
772pub struct NoopTurnActivitySink;
773
774/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
775pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
776
777#[async_trait::async_trait]
778impl TurnActivitySink for NoopTurnActivitySink {
779    fn is_noop(&self) -> bool {
780        true
781    }
782
783    async fn emit(&self, _activity: TurnActivity) {}
784}
785
786/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
787/// turn-driving entry points (`stream_turn`,
788/// `stream_turn_with_agent_frames`).
789///
790/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
791/// default to no-op sinks. Effect scope is explicit and required at every
792/// runtime boundary that can execute nondeterministic work.
793pub struct TurnOptions<'a> {
794    events: Option<&'a dyn EventSink>,
795    turn_events: Option<&'a dyn TurnActivitySink>,
796    scoped_effect_controller: ScopedEffectController<'a>,
797    cancel: CancellationToken,
798}
799
800impl<'a> TurnOptions<'a> {
801    pub fn new(
802        cancel: CancellationToken,
803        scoped_effect_controller: ScopedEffectController<'a>,
804    ) -> Self {
805        Self {
806            events: None,
807            turn_events: None,
808            scoped_effect_controller,
809            cancel,
810        }
811    }
812
813    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
814        self.events = Some(events);
815        self
816    }
817
818    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
819        self.turn_events = Some(turn_events);
820        self
821    }
822
823    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
824        self.events.unwrap_or(&NOOP_EVENT_SINK)
825    }
826
827    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
828        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
829    }
830
831    pub(crate) fn effect_scope_id(&self) -> &str {
832        self.scoped_effect_controller.scope_id()
833    }
834
835    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
836        self.scoped_effect_controller.clone()
837    }
838}
839
840enum RuntimeStreamEvent {
841    Session(SessionEvent),
842    Turn(TurnActivity),
843}
844
845#[derive(Clone)]
846pub struct SessionStoreCreateRequest {
847    pub session_id: String,
848    pub relation: SessionRelation,
849    pub policy: SessionPolicy,
850}
851
852impl SessionStoreCreateRequest {
853    pub fn parent_session_id(&self) -> Option<&str> {
854        self.relation.parent_session_id()
855    }
856}
857
858#[async_trait::async_trait]
859pub trait SessionStoreFactory: Send + Sync {
860    /// Durability tier the stores produced by this factory provide; defaults to
861    /// [`DurabilityTier::Inline`].
862    fn durability_tier(&self) -> crate::DurabilityTier {
863        crate::DurabilityTier::Inline
864    }
865
866    async fn create_store(
867        &self,
868        request: &SessionStoreCreateRequest,
869    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
870
871    async fn open_existing_store(
872        &self,
873        _request: &SessionStoreCreateRequest,
874    ) -> Result<Option<Arc<dyn crate::store::RuntimePersistence>>, String> {
875        Ok(None)
876    }
877
878    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
879}
880
881/// Generic runtime for CLI or programmatic embedding.
882pub struct LashRuntime {
883    pub(in crate::runtime) session: Option<Session>,
884    pub(in crate::runtime) policy: SessionPolicy,
885    pub(in crate::runtime) host: RuntimeHost,
886    pub(in crate::runtime) services: RuntimeServices,
887    pub(in crate::runtime) state: RuntimeSessionState,
888    pub(in crate::runtime) runtime_scope_id: Arc<str>,
889    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
890    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
891    /// Protocol-owned turn options for this session.
892    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
893    /// Session-scoped token cost ledger. Shared by ALL
894    /// `RuntimeSessionServices` instances created from this runtime
895    /// (both per-turn and async maintenance). Entries accumulate here
896    /// and are drained into `state.token_ledger` at turn-commit time.
897    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
898    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
899    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
900    /// Resident-graph policy chosen by the host. Controls whether
901    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
902    /// graph or just the active path, matching the trimming behavior set at
903    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
904    pub(in crate::runtime) residency: Residency,
905}