1mod assembly;
2mod builder;
3mod config_ops;
4mod environment;
5mod host;
6mod io;
7mod lifecycle;
8mod observation;
9mod session_api;
10mod session_manager;
11mod session_ops;
12mod state;
13#[cfg(test)]
14mod tests;
15mod turn_commit_pipeline;
16mod turn_driver;
17mod turn_graph;
18mod turn_loop;
19mod turn_progress;
20mod usage;
21
22use std::any::Any;
23use std::collections::HashMap;
24use std::fmt;
25use std::sync::Arc;
26use std::sync::Mutex as StdMutex;
27use std::sync::atomic::{AtomicBool, Ordering};
28
29use tokio::sync::{Mutex, mpsc};
30use tokio_util::sync::CancellationToken;
31
32use crate::llm::types::{
33 LlmOutputPart, LlmProviderTraceEvent, LlmProviderTraceSender, LlmRequest, LlmResponse,
34 LlmStreamEvent, LlmUsage,
35};
36use crate::plugin::runtime_host::RuntimeSessionHost;
37use crate::plugin::{
38 CheckpointHookContext, PluginMessage, PrepareTurnRequest, SessionConfigChangedContext,
39};
40use crate::sansio::{LlmCallError, Response};
41use crate::session_model::{
42 Message, MessageRole, Part, PartKind, PruneState, SessionEvent, SessionPolicy, TokenUsage,
43 fresh_message_id, make_error_event, reassign_part_ids, shared_parts, transport_stream_events,
44};
45use crate::{
46 CheckpointKind, ExecutionMode, PersistentRuntimeServices, PluginActionInvokeError,
47 PromptHookContext, RuntimeServices, SandboxMessage, Session, SessionCreateRequest,
48 SessionError, SessionHandle, SessionSnapshot, SessionStartPoint, ToolCallRecord, TurnFinish,
49 TurnOutcome, TurnStop,
50};
51use crate::{Effect, TurnMachine};
52
53use host::*;
54use session_manager::*;
55use turn_commit_pipeline::*;
56use turn_driver::*;
57use turn_progress::*;
58
59pub use lash_sansio::PromptUsage;
61
62use assembly::{
63 LlmDebugText, LlmDebugToolCall, LlmStreamDebugState, LlmStreamEventLog, LlmStreamSummary,
64 StandardStreamAccumulator, StandardStreamState, TurnAssembler,
65};
66#[cfg(test)]
67#[allow(unused_imports)]
68use assembly::{classify_output_state, sanitize_assistant_output};
69pub use builder::EmbeddedRuntimeBuilder;
70pub use environment::{ParkedSession, Residency, RuntimeEnvironment, RuntimeEnvironmentBuilder};
71pub use host::{
72 BackgroundCancelPolicy, BackgroundClosePolicy, BackgroundRuntimeHost, BackgroundTaskAttempt,
73 BackgroundTaskEvent, BackgroundTaskFilter, BackgroundTaskHost, BackgroundTaskId,
74 BackgroundTaskKind, BackgroundTaskOutcome, BackgroundTaskRecord, BackgroundTaskRegistration,
75 BackgroundTaskScope, BackgroundTaskState, EmbeddedRuntimeHost, LocalBackgroundTaskCancel,
76 LocalBackgroundTaskHost, RuntimeCoreConfig,
77};
78use io::normalize_input_items;
79pub use observation::{RuntimeHandle, RuntimeObservation};
80pub use state::{PersistedSessionState, SessionStateEnvelope};
81use state::{
82 append_session_nodes_to_state, apply_residency_on_load, apply_session_checkpoint,
83 apply_session_head, normalize_session_graph,
84};
85pub use usage::{
86 SessionUsageReport, TokenLedgerEntry, UsageReportRow, UsageTotals, diff_token_ledger,
87 diff_usage_reports,
88};
89use usage::{merge_ledger_entry, merge_usage_delta_entries, normalize_prompt_usage};
90
91#[doc(hidden)]
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
93pub enum RuntimeTurnPhase {
94 ContextTransform,
95 BeforeTurnHooks,
96 PromptBuild,
97 EffectLoop,
98 FinalizeTurn,
99 PersistTurn,
100}
101
102#[doc(hidden)]
103pub trait RuntimeTurnPhaseProbe: Send + Sync {
104 fn begin(&self, phase: RuntimeTurnPhase);
105 fn end(&self, phase: RuntimeTurnPhase);
106}
107
108#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
110#[serde(tag = "type", rename_all = "snake_case")]
111pub enum InputItem {
112 Text { text: String },
113 ImageRef { id: String },
114}
115
116impl InputItem {
117 pub fn text(text: impl Into<String>) -> Self {
118 Self::Text { text: text.into() }
119 }
120
121 pub fn image_ref(id: impl Into<String>) -> Self {
122 Self::ImageRef { id: id.into() }
123 }
124}
125
126#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
128pub struct TurnInput {
129 pub items: Vec<InputItem>,
130 #[serde(default)]
131 pub image_blobs: HashMap<String, Vec<u8>>,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
134 pub mode_turn_options: Option<crate::ModeTurnOptions>,
135 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub trace_turn_id: Option<String>,
139 #[serde(skip)]
140 pub mode_extension: Option<ModeTurnExtensionHandle>,
141 #[serde(skip)]
142 pub turn_context: TurnContext,
143}
144
145impl TurnInput {
146 pub fn empty() -> Self {
147 Self::items(std::iter::empty())
148 }
149
150 pub fn text(text: impl Into<String>) -> Self {
151 Self::items([InputItem::text(text)])
152 }
153
154 pub fn items(items: impl IntoIterator<Item = InputItem>) -> Self {
155 Self {
156 items: items.into_iter().collect(),
157 image_blobs: HashMap::new(),
158 mode_turn_options: None,
159 trace_turn_id: None,
160 mode_extension: None,
161 turn_context: TurnContext::default(),
162 }
163 }
164
165 pub fn with_image_blob(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
166 self.image_blobs.insert(id.into(), bytes);
167 self
168 }
169
170 pub fn with_image_blobs<I, K>(mut self, image_blobs: I) -> Self
171 where
172 I: IntoIterator<Item = (K, Vec<u8>)>,
173 K: Into<String>,
174 {
175 self.image_blobs.extend(
176 image_blobs
177 .into_iter()
178 .map(|(id, bytes)| (id.into(), bytes)),
179 );
180 self
181 }
182
183 pub fn with_image_ref(mut self, id: impl Into<String>, bytes: Vec<u8>) -> Self {
184 let id = id.into();
185 self.items.push(InputItem::image_ref(id.clone()));
186 self.image_blobs.insert(id, bytes);
187 self
188 }
189
190 pub fn with_mode_turn_options(mut self, options: crate::ModeTurnOptions) -> Self {
191 self.mode_turn_options = Some(options);
192 self
193 }
194
195 pub fn with_trace_turn_id(mut self, trace_turn_id: impl Into<String>) -> Self {
196 self.trace_turn_id = Some(trace_turn_id.into());
197 self
198 }
199}
200
201#[derive(Clone, Default)]
202pub struct TurnContext {
203 plugin_inputs: HashMap<&'static str, Arc<dyn Any + Send + Sync>>,
204 provider: Option<crate::ProviderHandle>,
205 model: Option<String>,
206 model_variant: Option<Option<String>>,
207 prompt: crate::PromptLayer,
208}
209
210impl TurnContext {
211 pub fn new() -> Self {
212 Self::default()
213 }
214
215 pub fn insert_plugin_input<T>(&mut self, plugin_id: &'static str, input: T)
216 where
217 T: Send + Sync + 'static,
218 {
219 self.plugin_inputs.insert(plugin_id, Arc::new(input));
220 }
221
222 pub fn set_provider(&mut self, provider: crate::ProviderHandle) {
223 self.provider = Some(provider);
224 }
225
226 pub fn provider(&self) -> Option<&crate::ProviderHandle> {
227 self.provider.as_ref()
228 }
229
230 pub fn set_model(&mut self, model: impl Into<String>, variant: Option<String>) {
231 self.model = Some(model.into());
232 self.model_variant = Some(variant);
233 }
234
235 pub fn model_selection(&self) -> Option<(&str, Option<&str>)> {
236 self.model.as_deref().map(|model| {
237 (
238 model,
239 self.model_variant
240 .as_ref()
241 .and_then(|variant| variant.as_deref()),
242 )
243 })
244 }
245
246 pub fn plugin_input<T>(&self, plugin_id: &'static str) -> Option<&T>
247 where
248 T: 'static,
249 {
250 self.plugin_inputs
251 .get(plugin_id)
252 .and_then(|input| input.downcast_ref::<T>())
253 }
254
255 pub fn has_plugin_input(&self, plugin_id: &'static str) -> bool {
256 self.plugin_inputs.contains_key(plugin_id)
257 }
258
259 pub fn set_prompt_template(&mut self, template: crate::PromptTemplate) {
260 self.prompt.template = Some(template);
261 }
262
263 pub fn add_prompt_contribution(&mut self, contribution: crate::PromptContribution) {
264 self.prompt.add_contribution(contribution);
265 }
266
267 pub fn replace_prompt_slot(
268 &mut self,
269 slot: crate::PromptSlot,
270 contributions: impl IntoIterator<Item = crate::PromptContribution>,
271 ) {
272 self.prompt.replace_slot(slot, contributions);
273 }
274
275 pub fn clear_prompt_slot(&mut self, slot: crate::PromptSlot) {
276 self.prompt.clear_slot(slot);
277 }
278
279 pub fn set_prompt_layer(&mut self, prompt: crate::PromptLayer) {
280 self.prompt = prompt;
281 }
282
283 pub fn prompt_layer(&self) -> &crate::PromptLayer {
284 &self.prompt
285 }
286}
287
288impl fmt::Debug for TurnContext {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 f.debug_struct("TurnContext")
291 .field(
292 "plugin_inputs",
293 &self.plugin_inputs.keys().collect::<Vec<_>>(),
294 )
295 .field("has_provider", &self.provider.is_some())
296 .field("has_model", &self.model.is_some())
297 .field("has_prompt_layer", &(!self.prompt.is_empty()))
298 .finish()
299 }
300}
301
302#[derive(Clone)]
303pub struct ModeTurnExtensionHandle(Arc<dyn ModeTurnExtension>);
304
305impl ModeTurnExtensionHandle {
306 pub fn new(extension: impl ModeTurnExtension + 'static) -> Self {
307 Self(Arc::new(extension))
308 }
309
310 pub fn as_any(&self) -> &dyn Any {
311 self.0.as_any()
312 }
313
314 pub fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
315 self.0.prompt_contributions()
316 }
317}
318
319impl fmt::Debug for ModeTurnExtensionHandle {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 f.write_str("ModeTurnExtensionHandle(..)")
322 }
323}
324
325pub trait ModeTurnExtension: Send + Sync {
326 fn as_any(&self) -> &dyn Any;
327
328 fn prompt_contributions(&self) -> Vec<crate::PromptContribution> {
329 Vec::new()
330 }
331}
332
333#[derive(Clone)]
334pub struct ModeSessionExtensionHandle(Arc<dyn ModeSessionExtension>);
335
336impl ModeSessionExtensionHandle {
337 pub fn new(extension: impl ModeSessionExtension + 'static) -> Self {
338 Self(Arc::new(extension))
339 }
340
341 pub fn as_any(&self) -> &dyn Any {
342 self.0.as_any()
343 }
344}
345
346impl fmt::Debug for ModeSessionExtensionHandle {
347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348 f.write_str("ModeSessionExtensionHandle(..)")
349 }
350}
351
352pub trait ModeSessionExtension: Send + Sync {
353 fn as_any(&self) -> &dyn Any;
354}
355
356#[derive(Clone, Debug)]
357pub(super) enum NormalizedItem {
358 Text(String),
359 Image(crate::AttachmentRef),
360}
361
362#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
364pub struct AssistantOutput {
365 pub safe_text: String,
366 pub raw_text: String,
367 pub state: OutputState,
368}
369
370#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
372#[serde(rename_all = "snake_case")]
373pub enum OutputState {
374 Usable,
375 EmptyOutput,
376 TracebackOnly,
377 RecoveredFromError,
378}
379
380#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
382pub struct CodeOutputRecord {
383 pub output: String,
384 #[serde(default, skip_serializing_if = "Option::is_none")]
385 pub error: Option<String>,
386}
387
388#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
390pub struct ExecutionSummary {
391 pub mode: ExecutionMode,
392 #[serde(default)]
393 pub had_tool_calls: bool,
394 #[serde(default)]
395 pub had_code_execution: bool,
396}
397
398#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
400pub struct TurnIssue {
401 pub kind: String,
402 #[serde(default, skip_serializing_if = "Option::is_none")]
403 pub code: Option<String>,
404 #[serde(default, skip_serializing_if = "Option::is_none")]
405 pub terminal_reason: Option<crate::LlmTerminalReason>,
406 pub message: String,
407 #[serde(default, skip_serializing_if = "Option::is_none")]
408 pub raw: Option<String>,
409}
410
411#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
413pub struct AssembledTurn {
414 pub state: SessionStateEnvelope,
415 pub outcome: crate::TurnOutcome,
416 pub assistant_output: AssistantOutput,
417 pub execution: ExecutionSummary,
418 #[serde(default)]
419 pub token_usage: TokenUsage,
420 #[serde(default)]
425 pub children_usage: Vec<TokenLedgerEntry>,
426 #[serde(default)]
427 pub tool_calls: Vec<ToolCallRecord>,
428 #[serde(default)]
429 pub errors: Vec<TurnIssue>,
430}
431
432#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
438pub struct FollowedTurn {
439 pub turns: Vec<AssembledTurn>,
440}
441
442impl FollowedTurn {
443 pub fn final_turn(&self) -> Option<&AssembledTurn> {
444 self.turns.last()
445 }
446
447 pub fn into_final_turn(mut self) -> Option<AssembledTurn> {
448 self.turns.pop()
449 }
450
451 pub fn handoff_count(&self) -> usize {
452 self.turns
453 .iter()
454 .filter(|turn| matches!(turn.outcome, crate::TurnOutcome::Handoff { .. }))
455 .count()
456 }
457}
458
459#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
461pub struct RuntimeError {
462 pub code: String,
463 pub message: String,
464}
465
466impl std::fmt::Display for RuntimeError {
467 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468 write!(f, "{}: {}", self.code, self.message)
469 }
470}
471
472impl std::error::Error for RuntimeError {}
473
474#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
476pub struct TerminationPolicy {
477 #[serde(default)]
478 pub treat_missing_done_as_failure: bool,
479}
480
481impl Default for TerminationPolicy {
482 fn default() -> Self {
483 Self {
484 treat_missing_done_as_failure: true,
485 }
486 }
487}
488
489#[async_trait::async_trait]
492pub trait EventSink: Send + Sync {
493 fn is_noop(&self) -> bool {
494 false
495 }
496
497 async fn emit(&self, event: SessionEvent);
498}
499
500pub struct NoopEventSink;
502
503#[async_trait::async_trait]
504impl EventSink for NoopEventSink {
505 fn is_noop(&self) -> bool {
506 true
507 }
508
509 async fn emit(&self, _event: SessionEvent) {}
510}
511
512#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
514#[serde(transparent)]
515pub struct TurnActivityId(pub String);
516
517impl TurnActivityId {
518 pub fn new(id: impl Into<String>) -> Self {
519 Self(id.into())
520 }
521
522 pub fn fresh() -> Self {
523 Self(uuid::Uuid::new_v4().to_string())
524 }
525}
526
527#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
533pub struct TurnActivity {
534 pub id: TurnActivityId,
535 pub correlation_id: TurnActivityId,
536 #[serde(flatten)]
537 pub event: TurnEvent,
538}
539
540impl TurnActivity {
541 pub fn new(correlation_id: TurnActivityId, event: TurnEvent) -> Self {
542 Self {
543 id: TurnActivityId::fresh(),
544 correlation_id,
545 event,
546 }
547 }
548
549 pub fn independent(event: TurnEvent) -> Self {
550 let correlation_id = TurnActivityId::fresh();
551 Self::new(correlation_id, event)
552 }
553}
554
555#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
561#[serde(tag = "type", rename_all = "snake_case")]
562#[allow(clippy::large_enum_variant)]
563pub enum TurnEvent {
564 ModelRequestStarted {
565 mode_iteration: usize,
566 },
567 AssistantProseDelta {
568 text: String,
569 },
570 ReasoningDelta {
571 text: String,
572 },
573 CodeBlockStarted {
574 language: String,
575 code: String,
576 },
577 CodeBlockCompleted {
578 language: String,
579 output: String,
580 error: Option<String>,
581 success: bool,
582 duration_ms: u64,
583 tool_call_ids: Vec<String>,
584 },
585 ToolCallStarted {
586 call_id: Option<String>,
587 name: String,
588 args: serde_json::Value,
589 },
590 ToolCallCompleted {
591 call_id: Option<String>,
592 name: String,
593 args: serde_json::Value,
594 output: crate::ToolCallOutput,
595 duration_ms: u64,
596 },
597 SubmittedValue {
598 value: serde_json::Value,
599 },
600 ToolValue {
601 tool_name: String,
602 value: serde_json::Value,
603 },
604 Usage {
605 mode_iteration: usize,
606 usage: TokenUsage,
607 cumulative: TokenUsage,
608 },
609 ChildUsage {
610 session_id: String,
611 source: String,
612 model: String,
613 mode_iteration: usize,
614 usage: TokenUsage,
615 cumulative: TokenUsage,
616 },
617 RetryStatus {
618 wait_seconds: u64,
619 attempt: usize,
620 max_attempts: usize,
621 reason: String,
622 },
623 PluginSurface {
624 plugin_id: String,
625 event: crate::PluginSurfaceEvent,
626 },
627 QueuedInputAccepted {
628 checkpoint: crate::CheckpointKind,
629 inputs: Vec<crate::AcceptedInjectedTurnInput>,
630 },
631 QueuedMessagesCommitted {
632 messages: Vec<crate::PluginMessage>,
633 checkpoint: crate::CheckpointKind,
634 },
635 Error {
636 message: String,
637 },
638}
639
640#[async_trait::async_trait]
641pub trait TurnActivitySink: Send + Sync {
642 fn is_noop(&self) -> bool {
643 false
644 }
645
646 async fn emit(&self, activity: TurnActivity);
647}
648
649pub struct NoopTurnActivitySink;
650
651#[async_trait::async_trait]
652impl TurnActivitySink for NoopTurnActivitySink {
653 fn is_noop(&self) -> bool {
654 true
655 }
656
657 async fn emit(&self, _activity: TurnActivity) {}
658}
659
660enum RuntimeStreamEvent {
661 Session(SessionEvent),
662 Turn(TurnActivity),
663}
664
665#[derive(Clone)]
666pub struct SessionStoreCreateRequest {
667 pub session_id: String,
668 pub parent_session_id: Option<String>,
669 pub policy: SessionPolicy,
670}
671
672pub trait SessionStoreFactory: Send + Sync {
673 fn create_store(
674 &self,
675 request: &SessionStoreCreateRequest,
676 ) -> Result<Arc<dyn crate::store::RuntimePersistence>, String>;
677}
678
679fn debug_rss_kb() -> Option<u64> {
680 let status = std::fs::read_to_string("/proc/self/status").ok()?;
681 status.lines().find_map(|line| {
682 let value = line.strip_prefix("VmRSS:")?.trim();
683 let kb = value.split_whitespace().next()?.parse::<u64>().ok()?;
684 Some(kb)
685 })
686}
687
688pub struct LashRuntime {
690 pub(in crate::runtime) session: Option<Session>,
691 pub(in crate::runtime) policy: SessionPolicy,
692 pub(in crate::runtime) host: RuntimeHost,
693 pub(in crate::runtime) services: RuntimeServices,
694 pub(in crate::runtime) state: PersistedSessionState,
695 pub(in crate::runtime) runtime_scope_id: Arc<str>,
696 pub(in crate::runtime) managed_sessions: Arc<Mutex<HashMap<String, RuntimeHandle>>>,
697 pub(in crate::runtime) active_handoff_continuations: Arc<Mutex<HashMap<String, String>>>,
698 pub(in crate::runtime) managed_turns: Arc<Mutex<HashMap<String, ManagedSessionTurn>>>,
699 pub(in crate::runtime) mode_turn_options: crate::ModeTurnOptions,
701 pub(in crate::runtime) shared_token_ledger: Arc<std::sync::Mutex<Vec<TokenLedgerEntry>>>,
706 pub(in crate::runtime) background_sync_needed: Arc<AtomicBool>,
707 pub(in crate::runtime) pending_first_turn_inputs:
713 Arc<std::sync::Mutex<HashMap<String, crate::PluginMessage>>>,
714 pub(in crate::runtime) turn_phase_probe: Option<Arc<dyn RuntimeTurnPhaseProbe>>,
715}