use super::*;
fn trace_fields_from_outcome(
outcome: &TurnOutcome,
) -> (&'static str, &'static str, Option<lash_trace::TraceHandoff>) {
match outcome {
TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
("completed", "assistant_message", None)
}
TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
("completed", "submitted_value", None)
}
TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
TurnOutcome::Handoff { session_id } => (
"completed",
"handoff",
Some(lash_trace::TraceHandoff {
successor_session_id: session_id.clone(),
}),
),
TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
}
}
fn trace_stop_reason(stop: &TurnStop) -> &'static str {
match stop {
TurnStop::Cancelled => "cancelled",
TurnStop::Incomplete => "incomplete",
TurnStop::InvalidInput => "invalid_input",
TurnStop::MaxTurns => "max_turns",
TurnStop::ToolFailure => "tool_failure",
TurnStop::ProviderError => "provider_error",
TurnStop::PluginAbort => "plugin_abort",
TurnStop::RuntimeError => "runtime_error",
TurnStop::SubmittedError { .. } => "submitted_error",
TurnStop::ToolError { .. } => "tool_error",
}
}
impl LashRuntime {
fn max_context_tokens(&self) -> usize {
self.policy
.max_context_tokens
.expect("lash runtime requires explicit max_context_tokens")
}
#[doc(hidden)]
pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
self.turn_phase_probe = Some(probe);
}
fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
if let Some(probe) = self.turn_phase_probe.as_ref() {
probe.begin(phase);
}
}
fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
if let Some(probe) = self.turn_phase_probe.as_ref() {
probe.end(phase);
}
}
pub async fn stream_turn(
&mut self,
input: TurnInput,
events: &dyn EventSink,
cancel: CancellationToken,
) -> Result<AssembledTurn, RuntimeError> {
self.stream_turn_with_semantic_events(input, events, &NoopTurnActivitySink, cancel)
.await
}
pub(crate) async fn stream_turn_with_semantic_events(
&mut self,
input: TurnInput,
events: &dyn EventSink,
turn_events: &dyn TurnActivitySink,
cancel: CancellationToken,
) -> Result<AssembledTurn, RuntimeError> {
if let Some(execution_session_id) = self
.active_handoff_leaf(&self.state.session_id)
.await
.filter(|session_id| session_id != &self.state.session_id)
{
return self
.stream_turn_on_handoff_successor(
execution_session_id,
input,
events,
turn_events,
cancel,
)
.await;
}
self.stream_turn_inner(input.clone(), events, turn_events, cancel.clone())
.await
}
async fn active_handoff_leaf(&self, session_id: &str) -> Option<String> {
let continuations = self.active_handoff_continuations.lock().await;
let mut current = session_id.to_string();
let mut seen = std::collections::HashSet::new();
while seen.insert(current.clone()) {
let Some(next) = continuations.get(¤t).cloned() else {
return (current != session_id).then_some(current);
};
current = next;
}
None
}
async fn stream_turn_on_handoff_successor(
&mut self,
execution_session_id: String,
input: TurnInput,
events: &dyn EventSink,
turn_events: &dyn TurnActivitySink,
cancel: CancellationToken,
) -> Result<AssembledTurn, RuntimeError> {
let runtime_handle = {
let registry = self.managed_sessions.lock().await;
registry.get(&execution_session_id).cloned()
}
.ok_or_else(|| RuntimeError {
code: "handoff_successor_missing".to_string(),
message: format!("active handoff session `{execution_session_id}` is unavailable"),
})?;
let mut runtime = runtime_handle.runtime.lock().await;
runtime.state.turn_index = self.state.turn_index;
let turn = runtime
.stream_turn_inner(input, events, turn_events, cancel)
.await?;
runtime_handle.publish_from(&runtime);
self.state.turn_index = turn.state.turn_index;
Ok(turn)
}
pub async fn stream_turn_following_handoffs(
&mut self,
input: TurnInput,
events: &dyn EventSink,
cancel: CancellationToken,
) -> Result<FollowedTurn, RuntimeError> {
self.stream_turn_following_handoffs_with_semantic_events(
input,
events,
&NoopTurnActivitySink,
cancel,
)
.await
}
pub async fn stream_turn_events_following_handoffs(
&mut self,
input: TurnInput,
turn_events: &dyn TurnActivitySink,
cancel: CancellationToken,
) -> Result<FollowedTurn, RuntimeError> {
self.stream_turn_following_handoffs_with_semantic_events(
input,
&NoopEventSink,
turn_events,
cancel,
)
.await
}
pub async fn stream_turn_with_events_following_handoffs(
&mut self,
input: TurnInput,
events: &dyn EventSink,
turn_events: &dyn TurnActivitySink,
cancel: CancellationToken,
) -> Result<FollowedTurn, RuntimeError> {
self.stream_turn_following_handoffs_with_semantic_events(input, events, turn_events, cancel)
.await
}
async fn stream_turn_following_handoffs_with_semantic_events(
&mut self,
mut input: TurnInput,
events: &dyn EventSink,
turn_events: &dyn TurnActivitySink,
cancel: CancellationToken,
) -> Result<FollowedTurn, RuntimeError> {
let follow_mode_turn_options = input.mode_turn_options.clone();
let follow_turn_context = input.turn_context.clone();
let follow_trace_turn_id = input
.trace_turn_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
input.trace_turn_id = Some(follow_trace_turn_id.clone());
let mut turns = Vec::new();
loop {
let turn = self
.stream_turn_with_semantic_events(input, events, turn_events, cancel.clone())
.await?;
let successor_session_id = match &turn.outcome {
TurnOutcome::Handoff { session_id } => Some(session_id.clone()),
_ => None,
};
turns.push(turn);
let Some(successor_session_id) = successor_session_id else {
return Ok(FollowedTurn { turns });
};
let seed = self
.pending_first_turn_inputs
.lock()
.expect("pending first turn inputs lock")
.remove(&successor_session_id)
.ok_or_else(|| RuntimeError {
code: "handoff_missing_first_turn".to_string(),
message: format!(
"handoff session `{successor_session_id}` did not provide a first turn"
),
})?;
input = turn_input_from_plugin_message(seed);
input.mode_turn_options = follow_mode_turn_options.clone();
input.trace_turn_id = Some(follow_trace_turn_id.clone());
input.turn_context = follow_turn_context.clone();
if let Some(successor_handle) = {
let registry = self.managed_sessions.lock().await;
registry.get(&successor_session_id).cloned()
} {
let mut successor = successor_handle.runtime.lock().await;
successor.state.turn_index = self.state.turn_index.saturating_sub(1);
successor_handle.publish_from(&successor);
}
}
}
async fn stream_turn_inner(
&mut self,
input: TurnInput,
events: &dyn EventSink,
turn_events: &dyn TurnActivitySink,
cancel: CancellationToken,
) -> Result<AssembledTurn, RuntimeError> {
self.refresh_session_graph_from_store().await;
if let Some(extension) = &input.mode_extension
&& let Some(session) = self.session.as_ref()
{
let mode_session = std::sync::Arc::clone(session.plugins().mode_session());
mode_session
.validate_turn_extension(extension)
.await
.map_err(|err| RuntimeError {
code: "mode_turn_extension".to_string(),
message: err.to_string(),
})?;
}
let previous_prompt_usage = self.state.last_prompt_usage.clone();
let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
Ok(items) => items,
Err(e) => {
self.state.last_prompt_usage = None;
let mut assembler = TurnAssembler::default();
let error_event = SessionEvent::Error {
message: e.clone(),
envelope: Some(crate::session_model::ErrorEnvelope {
kind: "input_validation".to_string(),
code: Some("invalid_turn_input".to_string()),
terminal_reason: None,
user_message: e.clone(),
raw: None,
}),
};
assembler.push(&error_event);
emit_turn_activity_to_sink(
turn_events,
TurnActivity::independent(TurnEvent::Error { message: e }),
)
.await;
emit_session_event_to_sink(events, error_event).await;
let outcome_event = SessionEvent::TurnOutcome {
outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
};
assembler.push(&outcome_event);
emit_session_event_to_sink(events, outcome_event).await;
assembler.push(&SessionEvent::Done);
emit_session_event_to_sink(events, SessionEvent::Done).await;
return Ok(assembler.finish(
self.state.export_state(),
false,
None,
&self.host.core.termination,
));
}
};
let turn_index = self.state.turn_index + 1;
let trace_turn_id = input
.trace_turn_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
if self.host.core.trace_sink.is_some() {
let mut trace_metadata = std::collections::BTreeMap::new();
trace_metadata.insert(
"input_item_count".to_string(),
serde_json::json!(normalized.len()),
);
crate::trace::emit_trace(
&self.host.core.trace_sink,
&self.host.core.trace_context,
lash_trace::TraceContext::default()
.for_session(self.state.session_id.clone())
.for_turn_index(turn_index)
.for_turn(trace_turn_id.clone()),
lash_trace::TraceEvent::TurnStarted {
metadata: trace_metadata,
},
);
}
let base_read_model = self.state.read_model();
let base_messages = base_read_model.messages;
let base_render_cache = base_read_model.prompt_render_cache;
let mut turn_delta = Vec::new();
let user_id = fresh_message_id();
let mut user_parts: Vec<Part> = Vec::new();
for item in normalized {
match item {
NormalizedItem::Text(text) => {
if text.is_empty() {
continue;
}
user_parts.push(Part {
id: format!("{}.p{}", user_id, user_parts.len()),
kind: PartKind::Text,
content: text,
attachment: None,
tool_call_id: None,
tool_name: None,
tool_replay: None,
prune_state: PruneState::Intact,
reasoning_meta: None,
response_meta: None,
});
}
NormalizedItem::Image(reference) => {
user_parts.push(Part {
id: format!("{}.p{}", user_id, user_parts.len()),
kind: PartKind::Image,
content: String::new(),
attachment: Some(crate::session_model::message::PartAttachment {
reference,
}),
tool_call_id: None,
tool_name: None,
tool_replay: None,
prune_state: PruneState::Intact,
reasoning_meta: None,
response_meta: None,
});
}
}
}
if user_parts.is_empty() {
user_parts.push(Part {
id: format!("{}.p0", user_id),
kind: PartKind::Text,
content: String::new(),
attachment: None,
tool_call_id: None,
tool_name: None,
tool_replay: None,
prune_state: PruneState::Intact,
reasoning_meta: None,
response_meta: None,
});
}
reassign_part_ids(&user_id, &mut user_parts);
turn_delta.push(Message {
id: user_id.clone(),
role: MessageRole::User,
parts: shared_parts(user_parts),
origin: None,
});
let manager = self
.runtime_session_manager_for_turn(None)
.map_err(|err| RuntimeError {
code: "plugin_session_manager".to_string(),
message: err.to_string(),
})?;
let plugin_session = self
.session
.as_ref()
.map(|s| Arc::clone(s.plugins()))
.ok_or_else(|| RuntimeError {
code: "context_prepare_turn".to_string(),
message: "runtime session not available".to_string(),
})?;
let turn_ctx = crate::TurnTransformContext {
session_id: self.state.session_id.clone(),
state: self.read_view(),
prompt_usage: previous_prompt_usage.clone(),
max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
host: manager.clone(),
};
self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
let prepared_context = plugin_session
.prepare_turn_context(
&turn_ctx,
crate::session_model::context::PreparedContext {
messages: crate::MessageSequence::from_base_and_delta(
base_messages,
turn_delta,
)
.with_base_render_cache(base_render_cache),
..Default::default()
},
)
.await
.map_err(|err| RuntimeError {
code: "context_prepare_turn".to_string(),
message: err.to_string(),
})?;
self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
drop(turn_ctx);
let messages = prepared_context.messages;
if let Some(session) = self.session.as_mut() {
session.set_context_surface(
prepared_context.tool_providers,
prepared_context.prompt_contributions,
prepared_context.include_base_tools,
);
}
self.state.last_prompt_usage = None;
self.stream_prepared_turn(
messages,
previous_prompt_usage,
input.mode_turn_options.clone(),
input.mode_extension.clone(),
input.turn_context.clone(),
trace_turn_id,
turn_index,
events,
turn_events,
cancel,
)
.await
}
pub async fn run_turn_assembled(
&mut self,
input: TurnInput,
cancel: CancellationToken,
) -> Result<AssembledTurn, RuntimeError> {
self.stream_turn(input, &NoopEventSink, cancel).await
}
#[allow(clippy::too_many_arguments)]
pub async fn stream_prepared_turn(
&mut self,
messages: crate::MessageSequence,
_previous_prompt_usage: Option<PromptUsage>,
mode_turn_options: Option<crate::ModeTurnOptions>,
mode_extension: Option<crate::ModeTurnExtensionHandle>,
turn_context: crate::TurnContext,
trace_turn_id: String,
turn_index: usize,
events: &dyn EventSink,
turn_events: &dyn TurnActivitySink,
cancel: CancellationToken,
) -> Result<AssembledTurn, RuntimeError> {
let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
let mut turn_policy = self.policy.clone();
if let Some(provider) = turn_context.provider().cloned() {
let model = provider.default_model().to_string();
let model_variant = provider.default_model_variant(&model).map(str::to_string);
turn_policy.provider = provider;
turn_policy.model = model;
turn_policy.model_variant = model_variant;
}
if let Some((model, variant)) = turn_context.model_selection() {
turn_policy.model = model.to_string();
turn_policy.model_variant = variant.map(str::to_string);
}
let effective_mode_turn_options = mode_turn_options
.clone()
.unwrap_or_else(|| self.mode_turn_options.clone());
let manager = self
.runtime_session_manager_for_turn(Some(child_usage_event_relay.clone()))
.map_err(|err| RuntimeError {
code: "plugin_session_manager".to_string(),
message: err.to_string(),
})?;
let plugins = {
let session = self
.session
.as_ref()
.expect("lash runtime session must be available");
Arc::clone(session.plugins())
};
let capture_text_deltas =
turn_policy.provider.requires_streaming() || plugins.has_assistant_stream_hooks();
let mut assembler = TurnAssembler::new(capture_text_deltas);
self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
let prepared = {
let prepare_turn = plugins.prepare_turn(PrepareTurnRequest {
session_id: self.state.session_id.clone(),
state: crate::SessionReadView::from_runtime_state(
&self.state,
turn_policy.clone(),
effective_mode_turn_options.clone(),
),
messages,
host: manager.clone(),
turn_context: turn_context.clone(),
});
tokio::pin!(prepare_turn);
loop {
tokio::select! {
prepared = &mut prepare_turn => {
let prepared = prepared.map_err(|err| RuntimeError {
code: "plugin_prepare_turn".to_string(),
message: err.to_string(),
})?;
self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
break prepared;
}
maybe_event = event_rx.recv() => {
if let Some(event) = maybe_event {
emit_runtime_stream_event_to_sinks(
events,
turn_events,
event,
&mut assembler,
)
.await;
}
}
}
}
};
for event in &prepared.events {
assembler.push(event);
}
emit_session_events_to_sink(events, prepared.events).await;
if let Some(abort) = prepared.abort {
drop(event_tx);
let mut turn_pipeline = TurnCommitPipeline::from_state(self.state.clone());
turn_pipeline.apply_prepared_messages(&prepared.messages);
let state = turn_pipeline.into_final_state();
let issue = TurnIssue {
kind: "plugin".to_string(),
code: Some(abort.code),
terminal_reason: None,
message: abort.message.clone(),
raw: None,
};
let error_event = SessionEvent::Error {
message: abort.message,
envelope: Some(crate::session_model::ErrorEnvelope {
kind: "plugin".to_string(),
code: issue.code.clone(),
terminal_reason: None,
user_message: issue.message.clone(),
raw: None,
}),
};
assembler.push(&error_event);
emit_turn_activity_to_sink(
turn_events,
TurnActivity::independent(TurnEvent::Error {
message: issue.message.clone(),
}),
)
.await;
emit_session_event_to_sink(events, error_event).await;
let outcome_event = SessionEvent::TurnOutcome {
outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
};
assembler.push(&outcome_event);
emit_session_event_to_sink(events, outcome_event).await;
assembler.push(&SessionEvent::Done);
emit_session_event_to_sink(events, SessionEvent::Done).await;
return Ok(assembler.finish(
state.export_state(),
cancel.is_cancelled(),
Some(issue),
&self.host.core.termination,
));
}
let mut turn_pipeline = TurnCommitPipeline::from_state(self.state.clone());
let store = self
.session
.as_ref()
.and_then(|session| session.history_store());
turn_pipeline
.prepared_checkpoint(
store.as_ref().map(|store| store.as_ref()),
turn_policy.clone(),
turn_index,
&prepared.messages,
self.session.as_mut(),
)
.await
.map_err(|err| RuntimeError {
code: "store_commit_failed".to_string(),
message: err.to_string(),
})?;
let cancel_state = cancel.clone();
let session = self
.session
.take()
.expect("lash runtime session must be available");
let mut driver = RuntimeTurnDriver {
session,
policy: turn_policy.clone(),
host: self.host.clone(),
session_id: self.state.session_id.clone(),
turn_id: trace_turn_id.clone(),
turn_index,
turn_pipeline,
llm_stream_summaries: HashMap::new(),
next_llm_ordinal: 0,
session_manager: manager,
mode_turn_options: effective_mode_turn_options,
mode_extension,
turn_context,
turn_phase_probe: self.turn_phase_probe.clone(),
};
let mode_run_offset = 0;
let run_task = tokio::spawn(async move {
let (new_messages, new_mode_iteration) = driver
.run(prepared.messages, event_tx, cancel, mode_run_offset)
.await;
(driver, new_messages, new_mode_iteration)
});
tokio::pin!(run_task);
self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
let (driver, new_messages, _new_mode_iteration) = loop {
tokio::select! {
maybe_event = event_rx.recv() => {
if let Some(event) = maybe_event {
emit_runtime_stream_event_to_sinks(
events,
turn_events,
event,
&mut assembler,
)
.await;
}
}
joined = &mut run_task => {
child_usage_event_relay.clear();
let joined = match joined {
Ok(v) => v,
Err(e) => {
let issue = TurnIssue {
kind: "runtime".to_string(),
code: Some("run_task_join_failed".to_string()),
terminal_reason: None,
message: format!("Runtime turn task failed: {e}"),
raw: None,
};
return Ok(assembler.finish(
self.state.export_state(),
cancel_state.is_cancelled(),
Some(issue),
&self.host.core.termination,
));
}
};
break joined;
}
}
};
while let Some(event) = event_rx.recv().await {
emit_runtime_stream_event_to_sinks(events, turn_events, event, &mut assembler).await;
}
self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
tracing::debug!(
rss_kb = debug_rss_kb(),
new_message_count = new_messages.len(),
tool_call_count = assembler.tool_calls.len(),
"runtime post-run_task"
);
let child_ledger = {
let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
std::mem::take(&mut *ledger)
};
let RuntimeTurnDriver {
session,
policy,
mut turn_pipeline,
..
} = driver;
self.session = Some(session);
self.policy = self.state.policy.clone();
turn_pipeline.state_mut().policy = self.policy.clone();
turn_pipeline.state_mut().turn_index = turn_index;
let mut turn_usage_delta = child_ledger.clone();
if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
let entry = TokenLedgerEntry {
source: "turn".to_string(),
model: self.policy.model.clone(),
usage: assembler.token_usage.clone(),
};
turn_usage_delta.push(entry);
}
let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
turn_pipeline.finalize_turn_read_state(
new_messages,
&assembler.tool_calls,
cancel_state.is_cancelled(),
);
if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
}
let last_prompt_usage = assembler
.last_llm_usage()
.and_then(|usage| normalize_prompt_usage(&policy.provider, usage));
let finalize_manager = if self.session.is_some() {
Some(
self.runtime_session_manager_for_turn(None)
.map_err(|err| RuntimeError {
code: "plugin_session_manager".to_string(),
message: err.to_string(),
})?,
)
} else {
None
};
tracing::debug!(
rss_kb = debug_rss_kb(),
state_message_count = turn_pipeline.state_mut().read_model().messages.len(),
graph_node_count = turn_pipeline.state_mut().session_graph.nodes.len(),
token_ledger_entries = turn_pipeline.state_mut().token_ledger.len(),
"runtime before assembler.finish"
);
turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage.clone();
let assembled_state = turn_pipeline.export_state_for_assembly();
let assembled = assembler.finish(
assembled_state,
cancel_state.is_cancelled(),
None,
&self.host.core.termination,
);
tracing::debug!(
rss_kb = debug_rss_kb(),
assembled_message_count = assembled.state.read_model().messages.len(),
assembled_graph_node_count = assembled.state.session_graph.nodes.len(),
"runtime after assembler.finish"
);
if let Some(session) = self.session.as_ref() {
let plugins = Arc::clone(session.plugins());
let manager = finalize_manager.expect("finalize manager should exist with session");
tracing::debug!(rss_kb = debug_rss_kb(), "runtime before finalize_turn");
self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
let finalized = plugins
.finalize_turn(assembled, manager)
.await
.map_err(|err| RuntimeError {
code: "plugin_finalize_turn".to_string(),
message: err.to_string(),
})?;
self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
tracing::debug!(
rss_kb = debug_rss_kb(),
finalized_message_count = finalized.turn.state.read_model().messages.len(),
"runtime after finalize_turn"
);
let mut returned_turn = finalized.turn;
tracing::debug!(
rss_kb = debug_rss_kb(),
tool_state_present = turn_pipeline.state_mut().tool_state_ref.is_some()
|| turn_pipeline.state_mut().tool_state_snapshot.is_some(),
plugin_snapshot_present = turn_pipeline.state_mut().plugin_snapshot_ref.is_some()
|| turn_pipeline.state_mut().plugin_snapshot.is_some(),
"runtime before stamp_runtime_state"
);
self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
turn_pipeline
.final_commit(&mut returned_turn, self.session.as_mut(), &turn_usage_delta)
.await?;
tracing::debug!(
rss_kb = debug_rss_kb(),
resident_graph_node_count = returned_turn.state.session_graph.nodes.len(),
persisted_message_count = returned_turn.state.read_model().messages.len(),
"runtime after stamp_runtime_state"
);
emit_session_events_to_sink(events, finalized.events).await;
self.state = turn_pipeline.into_final_state();
if let Some(session) = self.session.as_ref()
&& let Ok(host) = self.runtime_session_manager()
{
session
.plugins()
.emit_runtime_event(crate::PluginRuntimeEvent::TurnPersisted(
crate::SessionStateChangedContext {
session_id: self.state.session_id.clone(),
state: crate::SessionReadView::from_exported_state(
&returned_turn.state,
),
host,
},
))
.await;
}
self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
if self.host.core.trace_sink.is_some() {
let (status, done_reason, handoff) =
trace_fields_from_outcome(&returned_turn.outcome);
crate::trace::emit_trace(
&self.host.core.trace_sink,
&self.host.core.trace_context,
lash_trace::TraceContext::default()
.for_session(returned_turn.state.session_id.clone())
.for_turn_index(returned_turn.state.turn_index)
.for_turn(trace_turn_id.clone()),
lash_trace::TraceEvent::TurnCompleted {
status: status.to_string(),
done_reason: done_reason.to_string(),
handoff,
},
);
}
Ok(returned_turn)
} else {
self.state.apply_exported_state(&assembled.state);
if self.host.core.trace_sink.is_some() {
let (status, done_reason, handoff) = trace_fields_from_outcome(&assembled.outcome);
crate::trace::emit_trace(
&self.host.core.trace_sink,
&self.host.core.trace_context,
lash_trace::TraceContext::default()
.for_session(assembled.state.session_id.clone())
.for_turn_index(assembled.state.turn_index)
.for_turn(trace_turn_id),
lash_trace::TraceEvent::TurnCompleted {
status: status.to_string(),
done_reason: done_reason.to_string(),
handoff,
},
);
}
Ok(assembled)
}
}
fn normalize_input_items(
&self,
items: &[InputItem],
image_blobs: &HashMap<String, Vec<u8>>,
) -> Result<Vec<NormalizedItem>, String> {
normalize_input_items(items, image_blobs, self.host.core.attachment_store.as_ref())
}
}
fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
let mut items = Vec::new();
if !message.content.is_empty() {
items.push(InputItem::Text {
text: message.content,
});
}
let mut image_blobs = HashMap::new();
for (index, bytes) in message.images.into_iter().enumerate() {
let id = format!("handoff-seed-image-{index}");
image_blobs.insert(id.clone(), bytes);
items.push(InputItem::ImageRef { id });
}
TurnInput {
items,
image_blobs,
mode_turn_options: None,
trace_turn_id: None,
mode_extension: None,
turn_context: crate::TurnContext::default(),
}
}
async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
if !events.is_noop() {
events.emit(activity).await;
}
}
async fn emit_runtime_stream_event_to_sinks(
events: &dyn EventSink,
turn_events: &dyn TurnActivitySink,
event: RuntimeStreamEvent,
assembler: &mut TurnAssembler,
) {
match event {
RuntimeStreamEvent::Session(event) => {
assembler.push(&event);
emit_session_event_to_sink(events, event).await;
}
RuntimeStreamEvent::Turn(activity) => {
assembler.push_turn_activity(&activity);
emit_turn_activity_to_sink(turn_events, activity).await;
}
}
}