Skip to main content

lash_core/runtime/
mod.rs

1mod assembly;
2mod builder;
3pub(crate) mod causal;
4mod config_ops;
5mod effect;
6mod environment;
7mod error;
8mod host;
9mod in_memory_store;
10mod io;
11mod lifecycle;
12mod observation;
13mod process;
14mod process_work_runner;
15mod process_worker;
16mod queued_work_runner;
17mod session_api;
18mod session_manager;
19mod session_ops;
20mod state;
21#[cfg(test)]
22pub(crate) mod tests;
23mod turn_boundary;
24mod turn_commit_draft;
25mod turn_driver;
26mod turn_graph_editor;
27mod turn_loop;
28mod turn_queue;
29mod usage;
30
31use std::any::Any;
32use std::collections::HashMap;
33use std::fmt;
34use std::sync::Arc;
35use std::sync::Mutex as StdMutex;
36use std::sync::atomic::{AtomicBool, Ordering};
37
38use tokio::sync::{Mutex, mpsc};
39use tokio_util::sync::CancellationToken;
40
41use crate::llm::types::{
42    LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
43    LlmStreamEvent, LlmUsage,
44};
45use crate::plugin::{
46    CheckpointHookContext, PrepareTurnRequest, SessionConfigChangedContext, SessionRelation,
47};
48use crate::sansio::{LlmCallError, Response};
49use crate::session_model::{
50    Message, MessageRole, Part, PartKind, PruneState, RuntimeSessionPolicy, SessionEvent,
51    SessionPolicy, TokenUsage, fresh_message_id, make_error_event, reassign_part_ids, shared_parts,
52    transport_stream_events,
53};
54use crate::{
55    CheckpointKind, PersistentRuntimeServices, PluginActionInvokeError, PromptHookContext,
56    RuntimeServices, SandboxMessage, Session, SessionCreateRequest, SessionError, SessionHandle,
57    SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish, TurnOutcome, TurnStop,
58};
59use crate::{Effect, TurnMachine};
60
61use host::*;
62use session_manager::*;
63use turn_boundary::*;
64use turn_commit_draft::*;
65use turn_driver::*;
66
67pub(crate) const RUNTIME_TURN_LEASE_TTL_MS: u64 = 15 * 60 * 1000;
68
69// `PromptUsage` is re-exported below alongside the runtime's own types.
70pub use lash_sansio::PromptUsage;
71
72use assembly::{
73    LlmDebugText, LlmDebugToolCall, LlmStreamAccumulator, LlmStreamDebugState, LlmStreamEventLog,
74    LlmStreamState, LlmStreamSummary, TurnAssembler,
75};
76#[cfg(test)]
77#[allow(unused_imports)]
78use assembly::{classify_output_state, sanitize_assistant_output};
79pub use builder::EmbeddedRuntimeBuilder;
80pub use causal::process_event_invocation;
81pub(crate) use causal::tool_retry_sleep_invocation;
82pub(crate) use effect::RuntimeEffectControllerHandle;
83pub use effect::{
84    CausalRef, EffectHost, EffectScope, InlineEffectHost, InlineRuntimeEffectController,
85    LlmAttachmentSpec, LlmRequestSpec, ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand,
86    RuntimeEffectController, RuntimeEffectControllerError, RuntimeEffectEnvelope,
87    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
88    RuntimeReplay, RuntimeScope, RuntimeSubject, ScopedEffectController,
89};
90pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
91pub use error::{DurableStoreFacet, RuntimeError, RuntimeErrorCode};
92pub use host::{EmbeddedRuntimeHost, ProcessRuntimeHost, RuntimeHostConfig};
93pub use in_memory_store::{InMemorySessionStore, InMemorySessionStoreFactory};
94use io::normalize_input_items;
95pub use observation::{
96    InMemoryLiveReplayStore, InMemoryLiveReplayStoreConfig, LiveReplayGap, LiveReplayGapReason,
97    LiveReplayResult, LiveReplayStore, LiveReplayStoreError, LiveReplaySubscribeResult,
98    LiveReplaySubscription, RuntimeHandle, RuntimeObservation, SessionCursor, SessionCursorError,
99    SessionObservation, SessionObservationEvent, SessionObservationEventPayload,
100    SessionObservationSubscription, SessionProcessEventKind, SessionQueueEventKind, SessionResume,
101    SessionRevision,
102};
103#[cfg(any(test, feature = "testing"))]
104pub use process::TestLocalProcessRegistry;
105pub use process::{
106    DefaultProcessCancelAbility, ObservedProcess, ObservedProcessEvent, ObservedWorkItem,
107    PROCESS_LEASE_SCHEMA_VERSION, PreparedProcessEventAppend, ProcessAwaitOutput,
108    ProcessCancelAbility, ProcessCancelAllRequest, ProcessCancelRequest, ProcessCancelSource,
109    ProcessCancelSummary, ProcessDefinitionSelector, ProcessDefinitionSummary, ProcessEvent,
110    ProcessEventAppendRequest, ProcessEventAppendResult, ProcessEventSemantics,
111    ProcessEventSemanticsSpec, ProcessEventType, ProcessExecutionContext, ProcessExternalRef,
112    ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry, ProcessHandleSummary,
113    ProcessId, ProcessInput, ProcessLease, ProcessLeaseCompletion, ProcessLifecycleStatus,
114    ProcessListFilter, ProcessListMode, ProcessOpScope, ProcessProvenance, ProcessRecord,
115    ProcessRegistration, ProcessRegistry, ProcessScope, ProcessScopeId, ProcessService,
116    ProcessSessionDeleteReport, ProcessStartGrant, ProcessStartOptions, ProcessStartRequest,
117    ProcessStatus, ProcessStatusFilter, ProcessTerminalSemantics, ProcessTerminalSpec,
118    ProcessTerminalState, ProcessValueSelector, ProcessWake, ProcessWakeDedupeKey,
119    ProcessWakeDelivery, ProcessWakeSpec, ProcessWorkObserver, ProcessWorkSnapshot,
120    UnavailableProcessService, current_epoch_ms, epoch_ms_from_system_time,
121    lashlang_process_event_types, materialize_process_event_semantics,
122    prepare_process_event_append, prepare_process_registration, process_event_payload_hash,
123    process_wake_delivery, process_wake_input_from_event_payload, process_wake_turn_cause,
124    process_wake_turn_text, require_event_replay, system_time_from_epoch_ms,
125};
126pub use process_work_runner::{
127    InlineProcessRunHandle, ProcessRunHandle, ProcessWorkDriver, ProcessWorkPoke, ProcessWorkRunner,
128};
129pub use process_worker::{DurableProcessWorker, DurableProcessWorkerConfig};
130pub use queued_work_runner::{
131    QueuedWorkPoke, QueuedWorkRunHandle, QueuedWorkRunOutcome, QueuedWorkRunRequest,
132    QueuedWorkRunner,
133};
134pub use session_manager::DirectCompletionClient;
135pub use state::RuntimeSessionState;
136use state::{
137    append_session_nodes_to_state, apply_residency_on_load, apply_session_checkpoint,
138    apply_session_head, normalize_session_graph, open_agent_frame_in_state,
139};
140pub use turn_loop::ensure_durable_effect_input;
141pub use turn_queue::{
142    DeliveryPolicy, MergeKey, QUEUED_WORK_CLAIM_TTL_MS, QueuedCheckpointWork, QueuedTurnWork,
143    QueuedWorkBatch, QueuedWorkBatchDraft, QueuedWorkClaim, QueuedWorkClaimBoundary,
144    QueuedWorkCompletion, QueuedWorkItem, QueuedWorkPayload, SessionCommand, SessionCommandReceipt,
145    SlotPolicy, process_wake_batch_draft,
146};
147pub use usage::{
148    SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
149    diff_usage_reports,
150};
151use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
152
153#[doc(hidden)]
154#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
155pub enum RuntimeTurnPhase {
156    ContextTransform,
157    BeforeTurnHooks,
158    PromptBuild,
159    EffectLoop,
160    FinalizeTurn,
161    PersistTurn,
162    FinalCommit,
163    PostPersistHooks,
164}
165
166#[doc(hidden)]
167pub trait RuntimeTurnPhaseProbe: Send + Sync {
168    fn begin(&self, phase: RuntimeTurnPhase);
169    fn end(&self, phase: RuntimeTurnPhase);
170    fn begin_named(&self, _phase: &str) {}
171    fn end_named(&self, _phase: &str) {}
172}
173
174/// Host-provided per-turn input.
175#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
176#[serde(tag = "type", rename_all = "snake_case")]
177pub enum InputItem {
178    Text { text: String },
179    ImageRef { id: String },
180}
181
182impl InputItem {
183    pub fn text(text: impl Into<String>) -> Self {
184        Self::Text { text: text.into() }
185    }
186
187    pub fn image_ref(id: impl Into<String>) -> Self {
188        Self::ImageRef { id: id.into() }
189    }
190}
191
192/// Host-provided per-turn input.
193#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
194pub struct TurnInput {
195    pub items: Vec<InputItem>,
196    #[serde(default)]
197    pub image_blobs: HashMap<String, Vec<u8>>,
198    /// Per-turn override for protocol-owned turn options.
199    #[serde(default, skip_serializing_if = "Option::is_none")]
200    pub protocol_turn_options: Option<crate::ProtocolTurnOptions>,
201    /// Optional externally-stable trace turn id. Normal runtime callers leave
202    /// this empty and the runtime generates one per outer turn.
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub trace_turn_id: Option<String>,
205    #[serde(skip)]
206    pub protocol_extension: Option<ProtocolTurnExtensionHandle>,
207    #[serde(skip)]
208    pub turn_context: TurnContext,
209}
210
211impl TurnInput {
212    pub fn empty() -> Self {
213        Self::items(std::iter::empty())
214    }
215
216    pub fn text(text: impl Into<String>) -> Self {
217        Self::items([InputItem::text(text)])
218    }
219
220    pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
221        Self {
222            items: items.into_iter().collect(),
223            image_blobs: HashMap::new(),
224            protocol_turn_options: None,
225            trace_turn_id: None,
226            protocol_extension: None,
227            turn_context: TurnContext::default(),
228        }
229    }
230
231    pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
232        self.image_blobs.insert(id.into(), bytes);
233        self
234    }
235
236    pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
237    where
238        I: IntoIterator<Item = (K, Vec<u8>)>,
239        K: Into<String>,
240    {
241        self.image_blobs.extend(
242            image_blobs
243                .into_iter()
244                .map(|(id, bytes)| (id.into(), bytes)),
245        );
246        self
247    }
248
249    pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
250        let id = id.into();
251        self.items.push(InputItem::image_ref(id.clone()));
252        self.image_blobs.insert(id, bytes);
253        self
254    }
255
256    pub fn with_protocol_turn_options(mut self, options: crate::ProtocolTurnOptions) -> Self {
257        self.protocol_turn_options = Some(options);
258        self
259    }
260
261    pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
262        self.trace_turn_id = Some(trace_turn_id.into());
263        self
264    }
265}
266
267/// Per-turn, in-process side channel of typed plugin inputs.
268///
269/// This is an `Any`-keyed map of live Rust values handed to plugins for a
270/// single turn. It is deliberately **not** serializable: the values never
271/// survive a process boundary, so durable effect-host runs explicitly reject a
272/// turn that carries any live inputs (see
273/// [`LiveTurnInputs::durable_effect_rejection`]). Durable callers must instead
274/// encode replayable data in
275/// `protocol_turn_options` or persisted plugin state.
276#[derive(Clone, Default)]
277pub struct LiveTurnInputs {
278    inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
279}
280
281impl LiveTurnInputs {
282    fn insert<T>(&mut self, plugin_id: &'static str, input: T)
283    where
284        T: Send + Sync + 'static,
285    {
286        self.inputs.insert(plugin_id, Arc::new(input));
287    }
288
289    fn get<T>(&self, plugin_id: &'static str) -> Option<&T>
290    where
291        T: 'static,
292    {
293        self.inputs
294            .get(plugin_id)
295            .and_then(|input| input.downcast_ref::<T>())
296    }
297
298    fn contains(&self, plugin_id: &'static str) -> bool {
299        self.inputs.contains_key(plugin_id)
300    }
301
302    fn is_empty(&self) -> bool {
303        self.inputs.is_empty()
304    }
305
306    pub fn plugin_ids(&self) -> Vec<&'static str> {
307        self.inputs.keys().copied().collect()
308    }
309
310    /// Returns an error when live per-turn inputs would make a durable effect
311    /// host replay depend on process-local values.
312    pub(crate) fn durable_effect_rejection(&self) -> Result<(), RuntimeError> {
313        if self.is_empty() {
314            return Ok(());
315        }
316        Err(RuntimeError::new(
317            RuntimeErrorCode::DurableEffectLivePluginInput,
318            "durable effect hosts do not support live TurnContext plugin inputs; encode replayable data in protocol_turn_options or persisted plugin state",
319        ))
320    }
321}
322
323#[derive(Clone, Default)]
324pub struct TurnContext {
325    plugin_inputs: LiveTurnInputs,
326    provider: Option<crate::ProviderHandle>,
327    model: Option<crate::ModelSpec>,
328    prompt: crate::PromptLayer,
329}
330
331impl TurnContext {
332    pub fn new() -> Self {
333        Self::default()
334    }
335
336    pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
337    where
338        T: Send + Sync + 'static,
339    {
340        self.plugin_inputs.insert(plugin_id, input);
341    }
342
343    pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
344        self.provider = Some(provider);
345    }
346
347    pub fn provider(&self) -> Option<&crate::ProviderHandle> {
348        self.provider.as_ref()
349    }
350
351    pub fn set_model(&mut self, model: crate::ModelSpec) {
352        self.model = Some(model);
353    }
354
355    pub fn model_spec(&self) -> Option<&crate::ModelSpec> {
356        self.model.as_ref()
357    }
358
359    pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
360    where
361        T: 'static,
362    {
363        self.plugin_inputs.get(plugin_id)
364    }
365
366    pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
367        self.plugin_inputs.contains(plugin_id)
368    }
369
370    pub fn has_live_plugin_inputs(&self) -> bool {
371        !self.plugin_inputs.is_empty()
372    }
373
374    pub fn live_plugin_input_ids(&self) -> Vec<&'static str> {
375        self.plugin_inputs.plugin_ids()
376    }
377
378    /// Live plugin inputs for this turn. The durable boundary inspects this to
379    /// reject turns carrying non-serializable live state.
380    pub(crate) fn live_plugin_inputs(&self) -> &LiveTurnInputs {
381        &self.plugin_inputs
382    }
383
384    pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
385        self.prompt.template = Some(template);
386    }
387
388    pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
389        self.prompt.add_contribution(contribution);
390    }
391
392    pub fn replace_prompt_slot(
393        &mut self,
394        slot: crate::PromptSlot,
395        contributions: impl IntoIterator<Item = crate::PromptContribution>,
396    ) {
397        self.prompt.replace_slot(slot, contributions);
398    }
399
400    pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
401        self.prompt.clear_slot(slot);
402    }
403
404    pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
405        self.prompt = prompt;
406    }
407
408    pub fn prompt_layer(&self) -> &crate::PromptLayer {
409        &self.prompt
410    }
411}
412
413impl fmt::Debug for TurnContext {
414    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415        f.debug_struct("TurnContext")
416            .field("plugin_inputs", &self.plugin_inputs.plugin_ids())
417            .field("has_provider", &self.provider.is_some())
418            .field("has_model", &self.model.is_some())
419            .field("has_prompt_layer", &(!self.prompt.is_empty()))
420            .finish()
421    }
422}
423
424#[derive(Clone)]
425pub struct ProtocolTurnExtensionHandle(Arc<dyn ProtocolTurnExtension>);
426
427impl ProtocolTurnExtensionHandle {
428    pub fn new(extension: impl ProtocolTurnExtension + 'static) -> Self {
429        Self(Arc::new(extension))
430    }
431
432    pub fn as_any(&self) -> &dyn Any {
433        self.0.as_any()
434    }
435
436    pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
437        self.0.prompt_contributions()
438    }
439}
440
441impl fmt::Debug for ProtocolTurnExtensionHandle {
442    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443        f.write_str("ProtocolTurnExtensionHandle(..)")
444    }
445}
446
447pub trait ProtocolTurnExtension: Send + Sync {
448    fn as_any(&self) -> &dyn Any;
449
450    fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
451        Vec::new()
452    }
453}
454
455#[derive(Clone)]
456pub struct ProtocolSessionExtensionHandle(Arc<dyn ProtocolSessionExtension>);
457
458impl ProtocolSessionExtensionHandle {
459    pub fn new(extension: impl ProtocolSessionExtension + 'static) -> Self {
460        Self(Arc::new(extension))
461    }
462
463    pub fn as_any(&self) -> &dyn Any {
464        self.0.as_any()
465    }
466}
467
468impl fmt::Debug for ProtocolSessionExtensionHandle {
469    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470        f.write_str("ProtocolSessionExtensionHandle(..)")
471    }
472}
473
474pub trait ProtocolSessionExtension: Send + Sync {
475    fn as_any(&self) -> &dyn Any;
476}
477
478#[derive(Clone, Debug)]
479pub(super) enum NormalizedItem {
480    Text(String),
481    Image(crate::AttachmentRef),
482}
483
484/// Canonical assistant output payload.
485#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
486pub struct AssistantOutput {
487    pub safe_text: String,
488    pub raw_text: String,
489    pub state: OutputState,
490}
491
492/// Quality and usability of assembled terminal output.
493#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
494#[serde(rename_all = "snake_case")]
495pub enum OutputState {
496    Usable,
497    EmptyOutput,
498    TracebackOnly,
499    RecoveredFromError,
500}
501
502/// RLM code execution output observed during a turn.
503#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
504pub struct CodeOutputRecord {
505    pub output: String,
506    #[serde(default, skip_serializing_if = "Option::is_none")]
507    pub error: Option<String>,
508}
509
510/// High-level execution summary for a completed turn.
511#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
512pub struct ExecutionSummary {
513    #[serde(default)]
514    pub had_tool_calls: bool,
515    #[serde(default)]
516    pub had_code_execution: bool,
517}
518
519/// Structured issue surfaced during turn execution.
520#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
521pub struct TurnIssue {
522    pub kind: String,
523    #[serde(default, skip_serializing_if = "Option::is_none")]
524    pub code: Option<String>,
525    #[serde(default, skip_serializing_if = "Option::is_none")]
526    pub terminal_reason: Option<crate::LlmTerminalReason>,
527    pub message: String,
528    #[serde(default, skip_serializing_if = "Option::is_none")]
529    pub raw: Option<String>,
530}
531
532/// Canonical high-level turn result returned to hosts.
533#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
534pub struct AssembledTurn {
535    pub state: SessionSnapshot,
536    pub outcome: crate::TurnOutcome,
537    pub assistant_output: AssistantOutput,
538    pub execution: ExecutionSummary,
539    #[serde(default)]
540    pub token_usage: TokenUsage,
541    /// Per-(session, source, model) ledger entries for child sessions whose
542    /// LLM calls completed during this turn. `token_usage` above is the
543    /// parent's own LLM tokens; `total_usage` (on the embed-facing
544    /// `TurnResult`) sums both.
545    #[serde(default)]
546    pub children_usage: Vec<TokenLedgerEntry>,
547    #[serde(default)]
548    pub tool_calls: Vec<ToolCallRecord>,
549    #[serde(default)]
550    pub errors: Vec<TurnIssue>,
551}
552
553/// Result of driving one logical host turn through any AgentFrame switches.
554///
555/// A frame switch is an internal runtime continuation, similar to compaction
556/// from a host's perspective. Callers that need a final answer can use
557/// [`LashRuntime::stream_turn_with_agent_frames`] and inspect `final_turn()`.
558#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
559pub struct AgentFrameRun {
560    pub turns: Vec<AssembledTurn>,
561}
562
563impl AgentFrameRun {
564    pub fn final_turn(&self) -> Option<&AssembledTurn> {
565        self.turns.last()
566    }
567
568    pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
569        self.turns.pop()
570    }
571
572    pub fn frame_switch_count(&self) -> usize {
573        self.turns
574            .iter()
575            .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::AgentFrameSwitch { .. }))
576            .count()
577    }
578}
579
580/// Termination policy knobs.
581#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
582pub struct TerminationPolicy {
583    #[serde(default)]
584    pub treat_missing_done_as_failure: bool,
585}
586
587impl Default for TerminationPolicy {
588    fn default() -> Self {
589        Self {
590            treat_missing_done_as_failure: true,
591        }
592    }
593}
594
595/// Host event sink for low-level streaming runtime events.
596/// `SessionEvent` is protocol-specific preview/progress data.
597#[async_trait::async_trait]
598pub trait EventSink: Send + Sync {
599    fn is_noop(&self) -> bool {
600        false
601    }
602
603    async fn emit(&self, event: SessionEvent);
604}
605
606/// No-op sink useful for callers that only care about final state.
607pub struct NoopEventSink;
608
609/// Static no-op event sink for callers that need a `&dyn EventSink` default.
610pub static NOOP_EVENT_SINK: NoopEventSink = NoopEventSink;
611
612#[async_trait::async_trait]
613impl EventSink for NoopEventSink {
614    fn is_noop(&self) -> bool {
615        true
616    }
617
618    async fn emit(&self, _event: SessionEvent) {}
619}
620
621/// Stable identifier for a semantic turn activity.
622#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
623#[serde(transparent)]
624pub struct TurnActivityId(pub String);
625
626impl TurnActivityId {
627    pub fn new(id: impl Into<String>) -> Self {
628        Self(id.into())
629    }
630
631    pub fn fresh() -> Self {
632        Self(uuid::Uuid::new_v4().to_string())
633    }
634}
635
636/// App-facing semantic activity emitted during a turn.
637///
638/// `id` is unique per emitted activity event. `correlation_id` groups related
639/// events in the same logical activity, such as code start/completion, tool
640/// start/completion, or text deltas from one output block.
641#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
642pub struct TurnActivity {
643    pub id: TurnActivityId,
644    pub correlation_id: TurnActivityId,
645    #[serde(flatten)]
646    pub event: TurnEvent,
647}
648
649impl TurnActivity {
650    pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
651        Self {
652            id: TurnActivityId::fresh(),
653            correlation_id,
654            event,
655        }
656    }
657
658    pub fn independent(event: TurnEvent) -> Self {
659        let correlation_id = TurnActivityId::fresh();
660        Self::new(correlation_id, event)
661    }
662}
663
664/// App-facing semantic event payload for a turn activity.
665///
666/// Unlike [`SessionEvent`], these events are stable application signals rather
667/// than low-level runtime/debug events. Public streams carry these payloads
668/// inside [`TurnActivity`] so every emitted item has identity.
669#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
670#[serde(tag = "type", rename_all = "snake_case")]
671#[allow(clippy::large_enum_variant)]
672pub enum TurnEvent {
673    QueuedWorkStarted {
674        boundary: crate::QueuedWorkClaimBoundary,
675        batch_ids: Vec<String>,
676        causes: Vec<crate::TurnCause>,
677    },
678    ModelRequestStarted {
679        protocol_iteration: usize,
680    },
681    AssistantProseDelta {
682        text: String,
683    },
684    ReasoningDelta {
685        text: String,
686    },
687    CodeBlockStarted {
688        language: String,
689        code: String,
690        #[serde(default, skip_serializing_if = "Option::is_none")]
691        graph_key: Option<String>,
692    },
693    CodeBlockCompleted {
694        language: String,
695        output: String,
696        error: Option<String>,
697        success: bool,
698        duration_ms: u64,
699        tool_call_ids: Vec<String>,
700        #[serde(default, skip_serializing_if = "Option::is_none")]
701        graph_key: Option<String>,
702    },
703    ToolCallStarted {
704        call_id: Option<String>,
705        name: String,
706        args: serde_json::Value,
707    },
708    ToolCallCompleted {
709        call_id: Option<String>,
710        name: String,
711        args: serde_json::Value,
712        output: crate::ToolCallOutput,
713        duration_ms: u64,
714    },
715    SubmittedValue {
716        value: serde_json::Value,
717    },
718    ToolValue {
719        tool_name: String,
720        value: serde_json::Value,
721    },
722    Usage {
723        protocol_iteration: usize,
724        usage: TokenUsage,
725        cumulative: TokenUsage,
726    },
727    ChildUsage {
728        session_id: String,
729        source: String,
730        model: String,
731        protocol_iteration: usize,
732        usage: TokenUsage,
733        cumulative: TokenUsage,
734    },
735    RetryStatus {
736        wait_seconds: u64,
737        attempt: usize,
738        max_attempts: usize,
739        reason: String,
740    },
741    PluginRuntime {
742        plugin_id: String,
743        event: crate::PluginRuntimeEvent,
744    },
745    QueuedInputAccepted {
746        checkpoint: crate::CheckpointKind,
747        inputs: Vec<crate::AcceptedInjectedTurnInput>,
748    },
749    QueuedMessagesCommitted {
750        messages: Vec<crate::PluginMessage>,
751        checkpoint: crate::CheckpointKind,
752    },
753    Error {
754        message: String,
755    },
756}
757
758#[async_trait::async_trait]
759pub trait TurnActivitySink: Send + Sync {
760    fn is_noop(&self) -> bool {
761        false
762    }
763
764    async fn emit(&self, activity: TurnActivity);
765}
766
767pub struct NoopTurnActivitySink;
768
769/// Static no-op turn-activity sink for callers that need a `&dyn TurnActivitySink` default.
770pub static NOOP_TURN_ACTIVITY_SINK: NoopTurnActivitySink = NoopTurnActivitySink;
771
772#[async_trait::async_trait]
773impl TurnActivitySink for NoopTurnActivitySink {
774    fn is_noop(&self) -> bool {
775        true
776    }
777
778    async fn emit(&self, _activity: TurnActivity) {}
779}
780
781/// Optional sinks and scoped effect controller passed to one of [`LashRuntime`]'s
782/// turn-driving entry points (`stream_turn`,
783/// `stream_turn_with_agent_frames`).
784///
785/// Construct via [`TurnOptions::new`] and chain `with_*` builders. Event sinks
786/// default to no-op sinks. Effect scope is explicit and required at every
787/// runtime boundary that can execute nondeterministic work.
788pub struct TurnOptions<'a> {
789    events: Option<&'a dyn EventSink>,
790    turn_events: Option<&'a dyn TurnActivitySink>,
791    scoped_effect_controller: ScopedEffectController<'a>,
792    cancel: CancellationToken,
793}
794
795impl<'a> TurnOptions<'a> {
796    pub fn new(
797        cancel: CancellationToken,
798        scoped_effect_controller: ScopedEffectController<'a>,
799    ) -> Self {
800        Self {
801            events: None,
802            turn_events: None,
803            scoped_effect_controller,
804            cancel,
805        }
806    }
807
808    pub fn with_events(mut self, events: &'a dyn EventSink) -> Self {
809        self.events = Some(events);
810        self
811    }
812
813    pub fn with_turn_events(mut self, turn_events: &'a dyn TurnActivitySink) -> Self {
814        self.turn_events = Some(turn_events);
815        self
816    }
817
818    pub(crate) fn events_or_noop(&self) -> &'a dyn EventSink {
819        self.events.unwrap_or(&NOOP_EVENT_SINK)
820    }
821
822    pub(crate) fn turn_events_or_noop(&self) -> &'a dyn TurnActivitySink {
823        self.turn_events.unwrap_or(&NOOP_TURN_ACTIVITY_SINK)
824    }
825
826    pub(crate) fn effect_scope_id(&self) -> &str {
827        self.scoped_effect_controller.scope_id()
828    }
829
830    pub(crate) fn scoped_effect_controller(&self) -> ScopedEffectController<'a> {
831        self.scoped_effect_controller.clone()
832    }
833}
834
835enum RuntimeStreamEvent {
836    Session(SessionEvent),
837    Turn(TurnActivity),
838}
839
840#[derive(Clone)]
841pub struct SessionStoreCreateRequest {
842    pub session_id: String,
843    pub relation: SessionRelation,
844    pub policy: SessionPolicy,
845}
846
847impl SessionStoreCreateRequest {
848    pub fn parent_session_id(&self) -> Option<&str> {
849        self.relation.parent_session_id()
850    }
851}
852
853#[async_trait::async_trait]
854pub trait SessionStoreFactory: Send + Sync {
855    /// Durability tier the stores produced by this factory provide; defaults to
856    /// [`DurabilityTier::Inline`].
857    fn durability_tier(&self) -> crate::DurabilityTier {
858        crate::DurabilityTier::Inline
859    }
860
861    async fn create_store(
862        &self,
863        request: &SessionStoreCreateRequest,
864    ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
865
866    async fn delete_session(&self, session_id: &str) -> Result<(), String>;
867}
868
869/// Generic runtime for CLI or programmatic embedding.
870pub struct LashRuntime {
871    pub(in crate::runtime) session: Option<Session>,
872    pub(in crate::runtime) policy: SessionPolicy,
873    pub(in crate::runtime) host: RuntimeHost,
874    pub(in crate::runtime) services: RuntimeServices,
875    pub(in crate::runtime) state: RuntimeSessionState,
876    pub(in crate::runtime) runtime_scope_id: Arc<str>,
877    pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
878    pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
879    /// Protocol-owned turn options for this session.
880    pub(in crate::runtime) protocol_turn_options: crate::ProtocolTurnOptions,
881    /// Session-scoped token cost ledger. Shared by ALL
882    /// `RuntimeSessionServices` instances created from this runtime
883    /// (both per-turn and async maintenance). Entries accumulate here
884    /// and are drained into `state.token_ledger` at turn-commit time.
885    pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
886    pub(in crate::runtime) process_sync_needed: Arc<AtomicBool>,
887    pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
888    /// Resident-graph policy chosen by the host. Controls whether
889    /// [`LashRuntime::refresh_session_graph_from_store`] reloads the full
890    /// graph or just the active path, matching the trimming behavior set at
891    /// load time via [`apply_residency_on_load`](crate::runtime::apply_residency_on_load).
892    pub(in crate::runtime) residency: Residency,
893}