use super::*;
use crate::modes::RlmTurnBuilderExt as _;
#[derive(Clone, Debug)]
struct DurableEffectInvocation {
kind: lash_core::RuntimeEffectKind,
turn_id: Option<String>,
replay_key: Option<String>,
}
#[derive(Default)]
struct RecordingDurableEffectController {
invocations: StdMutex<Vec<DurableEffectInvocation>>,
}
impl RecordingDurableEffectController {
fn invocations(&self) -> Vec<DurableEffectInvocation> {
self.invocations
.lock()
.expect("durable effect invocations")
.clone()
}
}
#[async_trait]
impl lash_core::RuntimeEffectController for RecordingDurableEffectController {
fn durability_tier(&self) -> DurabilityTier {
DurabilityTier::Durable
}
fn requires_durable_attachment_store(&self) -> bool {
true
}
async fn execute_effect(
&self,
envelope: lash_core::RuntimeEffectEnvelope,
local_executor: lash_core::RuntimeEffectLocalExecutor<'_>,
) -> std::result::Result<lash_core::RuntimeEffectOutcome, lash_core::RuntimeEffectControllerError>
{
self.invocations
.lock()
.expect("durable effect invocations")
.push(DurableEffectInvocation {
kind: envelope.invocation.effect_kind().expect("effect kind"),
turn_id: envelope.invocation.scope.turn_id.clone(),
replay_key: envelope.invocation.replay_key().map(ToOwned::to_owned),
});
local_executor.execute(envelope).await
}
}
struct BlockingAppTools {
entered_tx: StdMutex<Option<oneshot::Sender<()>>>,
release_rx: TokioMutex<Option<oneshot::Receiver<()>>>,
}
impl BlockingAppTools {
fn new(entered_tx: oneshot::Sender<()>, release_rx: oneshot::Receiver<()>) -> Self {
Self {
entered_tx: StdMutex::new(Some(entered_tx)),
release_rx: TokioMutex::new(Some(release_rx)),
}
}
}
#[async_trait]
impl ToolProvider for BlockingAppTools {
fn tool_manifests(&self) -> Vec<lash_core::ToolManifest> {
vec![app_tool_definition().manifest()]
}
fn resolve_contract(&self, name: &str) -> Option<Arc<lash_core::ToolContract>> {
(name == "app_lookup").then(|| Arc::new(app_tool_definition().contract()))
}
async fn execute(&self, call: lash_core::ToolCall<'_>) -> lash_core::ToolResult {
assert_eq!(call.name, "app_lookup");
if let Some(tx) = self.entered_tx.lock().expect("entered tx").take() {
let _ = tx.send(());
}
if let Some(rx) = self.release_rx.lock().await.take() {
let _ = rx.await;
}
lash_core::ToolResult::ok(serde_json::json!({ "answer": "ready" }))
}
}
#[tokio::test]
async fn turn_builder_into_stream_emits_activities_and_finishes() -> Result<()> {
let core = standard_core();
let session = core.session("turn-stream").open().await?;
let mut stream = session
.turn(TurnInput::text("stream me"))
.into_stream(turn_scope(&session.session_id()))?;
let mut activities = Vec::new();
while let Some(activity) = stream.next_activity().await {
activities.push(activity?);
}
let result = stream.finish().await?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
assert_eq!(assistant_prose(&activities), "echo: stream me");
assert!(
activities
.iter()
.any(|activity| matches!(&activity.event, TurnEvent::AssistantProseDelta { .. }))
);
Ok(())
}
#[tokio::test]
async fn session_observation_replays_live_activity_and_commit() -> Result<()> {
let core = standard_core();
let session = core.session("session-observation-replay").open().await?;
let cursor = session.observe().current_observation().cursor;
let output = session
.turn(TurnInput::text("observe me"))
.run(turn_scope(&session.session_id()))
.await?;
assert_eq!(assistant_prose(&output.activities), "echo: observe me");
let replay = session.observe().resume_from_cursor(&cursor)?;
let SessionResume::Replayed { events } = replay else {
panic!("recent cursor should replay live events");
};
assert!(events.iter().any(|event| {
matches!(
&event.payload,
lash_core::SessionObservationEventPayload::TurnActivity(activity)
if matches!(
&activity.event,
TurnEvent::AssistantProseDelta { text } if text == "echo: observe me"
)
)
}));
assert!(events.iter().any(|event| {
matches!(
&event.payload,
lash_core::SessionObservationEventPayload::Committed { .. }
)
}));
Ok(())
}
#[tokio::test]
async fn session_observation_rejects_cursor_from_another_session() -> Result<()> {
let core = standard_core();
let session = core.session("session-observation-a").open().await?;
let other = core.session("session-observation-b").open().await?;
let other_cursor = other.observe().current_observation().cursor;
let err = session
.observe()
.resume_from_cursor(&other_cursor)
.expect_err("cursor from another session should be rejected");
assert!(
err.to_string().contains("session-observation-b")
&& err.to_string().contains("session-observation-a"),
"unexpected error: {err}"
);
Ok(())
}
#[tokio::test]
async fn session_observation_subscription_replays_buffered_events_before_live_events() -> Result<()>
{
let core = standard_core();
let session = core
.session("session-observation-subscribe-replay")
.open()
.await?;
let cursor = session.observe().current_observation().cursor;
session
.turn(TurnInput::text("first observed"))
.run(turn_scope(&session.session_id()))
.await?;
let SessionObservationSubscription::Subscribed(mut subscription) =
session.observe().subscribe_from_cursor(&cursor)?
else {
panic!("recent cursor should subscribe without a gap");
};
loop {
let event =
tokio::time::timeout(std::time::Duration::from_secs(2), subscription.next_event())
.await
.expect("timed out waiting for replayed event")
.expect("replayed event");
if observation_assistant_delta(&event).as_deref() == Some("echo: first observed") {
break;
}
}
session
.turn(TurnInput::text("second observed"))
.run(turn_scope(&session.session_id()))
.await?;
loop {
let event =
tokio::time::timeout(std::time::Duration::from_secs(2), subscription.next_event())
.await
.expect("timed out waiting for live event")
.expect("live event");
if observation_assistant_delta(&event).as_deref() == Some("echo: second observed") {
break;
}
}
Ok(())
}
fn observation_assistant_delta(event: &lash_core::SessionObservationEvent) -> Option<String> {
match &event.payload {
lash_core::SessionObservationEventPayload::TurnActivity(activity) => {
match &activity.event {
TurnEvent::AssistantProseDelta { text } => Some(text.clone()),
_ => None,
}
}
_ => None,
}
}
#[tokio::test]
async fn turn_stream_finish_returns_last_assistant_prose_group() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::standard())
.provider(semantic_group_provider())
.model(mock_model_spec())
.build()?;
let session = core.session("turn-stream-last-group").open().await?;
let mut stream = session
.turn(TurnInput::text("stream groups"))
.into_stream(turn_scope(&session.session_id()))?;
let mut activities = Vec::new();
while let Some(activity) = stream.next_activity().await {
activities.push(activity?);
}
let result = stream.finish().await?;
assert_eq!(assistant_prose(&activities), "firstsecond");
assert_eq!(result.assistant_message(), Some("second"));
assert!(result.is_success());
Ok(())
}
#[tokio::test]
async fn turn_run_collects_activities_and_returns_last_assistant_prose_group() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::standard())
.provider(semantic_group_provider())
.model(mock_model_spec())
.build()?;
let session = core.session("turn-run-last-group").open().await?;
let collected = session
.turn(TurnInput::text("run groups"))
.run(turn_scope(&session.session_id()))
.await?;
assert_eq!(assistant_prose(&collected.activities), "firstsecond");
assert_eq!(collected.result.assistant_message(), Some("second"));
Ok(())
}
#[tokio::test]
async fn retry_status_streams_as_semantic_turn_event() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::standard())
.provider(retry_once_provider())
.model(mock_model_spec())
.build()?;
let session = core.session("retry-status").open().await?;
let events = RecordingEvents::default();
let result = session
.turn(TurnInput::text("hello"))
.stream(&events, turn_scope(&session.session_id()))
.await?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
let retry = events
.snapshot()
.await
.into_iter()
.find(|event| matches!(&event.event, TurnEvent::RetryStatus { .. }))
.expect("retry status event");
let TurnEvent::RetryStatus {
wait_seconds,
attempt,
max_attempts,
reason,
} = retry.event
else {
unreachable!();
};
assert_eq!(wait_seconds, 0);
assert_eq!(attempt, 1);
assert_eq!(max_attempts, 2);
assert!(reason.contains("retry me"));
Ok(())
}
#[tokio::test]
async fn control_turn_accepts_prebuilt_turn_input() -> Result<()> {
let core = standard_core();
let session = core.session("raw-turn").open().await?;
let mut input = TurnInput::text("raw input");
input.trace_turn_id = Some("host-trace-id".to_string());
let result = session
.turn(input)
.run(inline_scope(lash_core::EffectScope::turn(
session.session_id(),
"host-trace-id",
)))
.await?;
assert_eq!(assistant_prose(&result.activities), "echo: raw input");
Ok(())
}
#[tokio::test]
async fn queued_input_acceptance_streams_semantic_ack_with_id() -> Result<()> {
let (entered_tx, entered_rx) = oneshot::channel();
let (release_tx, release_rx) = oneshot::channel();
let core = explicit_ephemeral_facets(LashCore::standard())
.provider(checkpoint_gated_provider(entered_tx, release_rx))
.model(mock_model_spec())
.store_factory(Arc::new(lash_core::InMemorySessionStoreFactory::new()))
.build()?;
let session = core.session("queued-input").open().await?;
let events = Arc::new(RecordingEvents::default());
let turn_session = session.clone();
let turn_events = Arc::clone(&events);
let scoped_effect_controller = turn_scope(&turn_session.session_id());
let turn = tokio::spawn(async move {
turn_session
.turn(TurnInput::text("hello"))
.stream(turn_events.as_ref(), scoped_effect_controller)
.await
});
entered_rx.await.expect("provider entered first call");
session
.control()
.injection()
.inject_turn_input(
Some("queue-1".to_string()),
lash_core::PluginMessage::text(lash_core::MessageRole::User, "queued follow-up"),
)
.await?;
release_tx.send(()).expect("release provider");
let result = turn.await.expect("turn task")?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
let events = events.snapshot().await;
assert!(events.iter().any(|event| matches!(
&event.event,
TurnEvent::QueuedInputAccepted {
checkpoint: lash_core::CheckpointKind::BeforeCompletion,
inputs,
..
} if inputs.iter().any(|input| input.id.as_deref() == Some("queue-1"))
)));
let prose = events
.into_iter()
.filter_map(|event| match event.event {
TurnEvent::AssistantProseDelta { text } => Some(text),
_ => None,
})
.collect::<String>();
assert!(prose.contains("after queued follow-up"));
Ok(())
}
#[tokio::test]
async fn turn_stream_receives_semantic_activities() -> Result<()> {
let core = standard_core();
let session = core.session("semantic-stream").open().await?;
let turn_events = RecordingEvents::default();
let result = session
.turn(TurnInput::text("semantic stream"))
.cancel(CancellationToken::new())
.stream(&turn_events, turn_scope(&session.session_id()))
.await?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
assert!(
turn_events
.snapshot()
.await
.iter()
.any(|event| matches!(&event.event, TurnEvent::AssistantProseDelta { .. }))
);
Ok(())
}
#[tokio::test]
async fn run_collects_ordered_assistant_prose_activity() -> Result<()> {
let core = standard_core();
let session = core.session("main").open().await?;
let result = session
.run(
TurnInput::text("visible"),
turn_scope(&session.session_id()),
)
.await?;
assert_eq!(assistant_prose(&result.activities), "echo: visible");
assert!(
result
.activities
.iter()
.any(|activity| matches!(&activity.event, TurnEvent::AssistantProseDelta { .. }))
);
assert!(
!result
.activities
.iter()
.any(|activity| matches!(&activity.event, TurnEvent::ToolCallCompleted { .. }))
);
assert!(
!result
.activities
.iter()
.any(|activity| matches!(&activity.event, TurnEvent::CodeBlockCompleted { .. }))
);
assert!(matches!(
result.result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
assert_eq!(result.result.usage.output_tokens, 2);
Ok(())
}
#[tokio::test]
async fn private_run_collector_records_ordered_activities() -> Result<()> {
let collector = RunActivityCollector::default();
collector
.emit(test_activity(
"code-1",
TurnEvent::CodeBlockStarted {
language: "lashlang".to_string(),
code: "x = await tools.app_lookup({})?".to_string(),
graph_key: None,
},
))
.await;
collector
.emit(test_activity(
"tool-1",
TurnEvent::ToolCallCompleted {
call_id: Some("call-1".to_string()),
name: "app_lookup".to_string(),
args: serde_json::json!({}),
output: lash_core::ToolCallOutput::success(serde_json::json!({ "ok": true })),
duration_ms: 3,
},
))
.await;
collector
.emit(test_activity(
"code-1",
TurnEvent::CodeBlockCompleted {
language: "lashlang".to_string(),
output: String::new(),
error: None,
success: true,
duration_ms: 4,
tool_call_ids: vec!["call-1".to_string()],
graph_key: None,
},
))
.await;
let activities = collector.snapshot();
assert_eq!(activities.len(), 3);
assert!(matches!(
&activities[0].event,
TurnEvent::CodeBlockStarted { language, code, .. }
if language == "lashlang" && code == "x = await tools.app_lookup({})?"
));
assert!(matches!(
&activities[1].event,
TurnEvent::ToolCallCompleted { name, output, .. }
if name == "app_lookup" && output.value_for_projection() == serde_json::json!({ "ok": true })
));
assert_eq!(activities[0].correlation_id, activities[2].correlation_id);
assert!(matches!(
&activities[2].event,
TurnEvent::CodeBlockCompleted { language, success, .. }
if language == "lashlang" && *success
));
Ok(())
}
#[tokio::test]
async fn turn_event_fanout_streams_to_collector_and_live_sink() -> Result<()> {
let live = Arc::new(RecordingEvents::default());
let core = explicit_ephemeral_facets(LashCore::standard())
.provider(tool_roundtrip_provider())
.model(mock_model_spec())
.tools(Arc::new(AppTools))
.store_factory(Arc::new(lash_core::InMemorySessionStoreFactory::new()))
.process_registry(Arc::new(TestLocalProcessRegistry::default()))
.build()?;
let session = core.session("fanout-tool-events").open().await?;
let output = session
.turn(TurnInput::text("use tool"))
.collect_with(live.as_ref(), turn_scope(&session.session_id()))
.await?;
assert!(matches!(
output.result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
assert_eq!(
serde_json::to_value(&output.activities).expect("recorded activities serialize"),
serde_json::to_value(live.snapshot().await).expect("live activities serialize")
);
assert_eq!(assistant_prose(&output.activities), "done");
assert_eq!(output.assistant_message(), Some("done"));
assert!(output.is_success());
let tool_completed = output
.activities
.iter()
.find(|activity| matches!(&activity.event, TurnEvent::ToolCallCompleted { .. }))
.expect("tool completion");
assert!(matches!(
&tool_completed.event,
TurnEvent::ToolCallCompleted { name, output, .. }
if name == "app_lookup" && output.value_for_projection() == serde_json::json!({ "ok": true })
));
Ok(())
}
#[tokio::test]
async fn stream_returns_terminal_metadata_without_prose() -> Result<()> {
let core = standard_core();
let session = core.session("semantic-events").open().await?;
let events = RecordingEvents::default();
let result = session
.turn(TurnInput::text("stream"))
.stream(&events, turn_scope(&session.session_id()))
.await?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
let prose = events
.snapshot()
.await
.into_iter()
.filter_map(|event| match event.event {
TurnEvent::AssistantProseDelta { text } => Some(text),
_ => None,
})
.collect::<String>();
assert_eq!(prose, "echo: stream");
assert!(!events.snapshot().await.iter().any(|event| matches!(
&event.event,
TurnEvent::SubmittedValue { .. } | TurnEvent::ToolValue { .. }
)));
Ok(())
}
#[tokio::test]
async fn stream_emits_chronological_tool_events_without_prose_pollution() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::standard())
.provider(tool_roundtrip_provider())
.model(mock_model_spec())
.tools(Arc::new(AppTools))
.store_factory(Arc::new(lash_core::InMemorySessionStoreFactory::new()))
.process_registry(Arc::new(TestLocalProcessRegistry::default()))
.build()?;
let session = core.session("tool-events").open().await?;
let events = RecordingEvents::default();
let collected = session
.turn(TurnInput::text("use tool"))
.stream(&events, turn_scope(&session.session_id()))
.await?;
assert!(matches!(
collected.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
let events = events.snapshot().await;
let started = events
.iter()
.position(|event| matches!(&event.event, TurnEvent::ToolCallStarted { .. }))
.expect("tool start event");
let completed = events
.iter()
.position(|event| matches!(&event.event, TurnEvent::ToolCallCompleted { .. }))
.expect("tool completed event");
assert!(started < completed);
let TurnEvent::ToolCallCompleted { output, .. } = &events[completed].event else {
unreachable!();
};
assert_eq!(
output.value_for_projection(),
serde_json::json!({ "ok": true })
);
let prose = events
.into_iter()
.filter_map(|event| match event.event {
TurnEvent::AssistantProseDelta { text } => Some(text),
_ => None,
})
.collect::<String>();
assert_eq!(prose, "done");
assert!(!prose.contains("ok"));
Ok(())
}
#[test]
fn rlm_tool_calls_stream_from_live_exec_boundary() -> Result<()> {
run_async_test_on_stack_budget("rlm-live-exec-boundary-test", || {
rlm_tool_calls_stream_from_live_exec_boundary_inner()
})
}
async fn rlm_tool_calls_stream_from_live_exec_boundary_inner() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::rlm())
.provider(queued_text_provider(vec![
"```lashlang\nvalue = await tools.app_lookup({})?\nsubmit \"done\"\n```",
]))
.model(mock_model_spec())
.tools(Arc::new(AppTools))
.store_factory(Arc::new(lash_core::InMemorySessionStoreFactory::new()))
.process_registry(Arc::new(TestLocalProcessRegistry::default()))
.build()?;
let session = core.session("rlm-live-tool-events").open().await?;
let events = Arc::new(RecordingEvents::default());
let result = session
.turn(TurnInput::text("use tool"))
.stream(events.as_ref(), turn_scope(&session.session_id()))
.await?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { .. })
));
let events = events.snapshot().await;
let code_started = events
.iter()
.position(|event| matches!(&event.event, TurnEvent::CodeBlockStarted { .. }))
.expect("code started");
let tool_started = events
.iter()
.position(|event| matches!(&event.event, TurnEvent::ToolCallStarted { .. }))
.expect("tool started");
let tool_completed = events
.iter()
.position(|event| matches!(&event.event, TurnEvent::ToolCallCompleted { .. }))
.expect("tool completed");
let code_completed = events
.iter()
.position(|event| matches!(&event.event, TurnEvent::CodeBlockCompleted { .. }))
.expect("code completed");
let terminal_output = events
.iter()
.position(|event| matches!(&event.event, TurnEvent::SubmittedValue { .. }))
.expect("terminal output");
assert!(code_started < tool_started);
assert!(tool_started < tool_completed);
assert!(tool_completed < code_completed);
assert!(code_completed < terminal_output);
assert!(!events[code_completed + 1..].iter().any(|event| matches!(
&event.event,
TurnEvent::ToolCallStarted { .. } | TurnEvent::ToolCallCompleted { .. }
)));
let TurnEvent::ToolCallCompleted {
call_id, output, ..
} = &events[tool_completed].event
else {
unreachable!();
};
assert_eq!(
output.value_for_projection(),
serde_json::json!({ "ok": true })
);
let TurnEvent::CodeBlockStarted {
graph_key: started_graph_key,
..
} = &events[code_started].event
else {
unreachable!();
};
assert!(
started_graph_key
.as_deref()
.is_some_and(|key| key.starts_with("effect:rlm-live-tool-events:")),
"missing foreground graph key on CodeBlockStarted: {started_graph_key:?}"
);
let TurnEvent::CodeBlockCompleted {
language,
success,
error,
tool_call_ids,
graph_key: completed_graph_key,
..
} = &events[code_completed].event
else {
unreachable!();
};
assert_eq!(language, "lashlang");
assert!(*success);
assert!(error.is_none());
assert_eq!(call_id.as_ref(), tool_call_ids.first());
assert_eq!(tool_call_ids.len(), 1);
assert_eq!(completed_graph_key, started_graph_key);
let read_view = result.state.read_view();
assert!(
read_view.messages().iter().all(|message| message
.parts
.iter()
.all(|part| part.tool_call_id.as_ref() != tool_call_ids.first())),
"live RLM tool calls should not be persisted as message history"
);
assert_eq!(
read_view
.materialized_session_graph()
.active_path_nodes()
.into_iter()
.filter_map(|node| node.event())
.filter(|event| matches!(event, lash_core::SessionEventRecord::Conversation(_)))
.count(),
read_view.messages().len()
);
let TurnEvent::SubmittedValue { value } = &events[terminal_output].event else {
unreachable!();
};
assert_eq!(value, &serde_json::json!("done"));
Ok(())
}
#[test]
fn continue_as_observation_emits_frame_switch_then_commit() -> Result<()> {
run_async_test_on_stack_budget("continue-as-observation-test", || {
continue_as_observation_emits_frame_switch_then_commit_inner()
})
}
async fn continue_as_observation_emits_frame_switch_then_commit_inner() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::rlm())
.provider(queued_text_provider(vec![
"```lashlang\nawait control.continue_as({ task: \"finish in a fresh frame\" })?\n```",
"```lashlang\nsubmit \"done after continue_as\"\n```",
]))
.model(mock_model_spec())
.store_factory(Arc::new(lash_core::InMemorySessionStoreFactory::new()))
.build()?;
let session = core.session("continue-as-observation").open().await?;
let cursor = session.observe().current_observation().cursor;
let output = session
.turn(TurnInput::text("switch frames"))
.run(turn_scope(&session.session_id()))
.await?;
assert_eq!(
output.submitted_value(),
Some(&serde_json::json!("done after continue_as"))
);
let SessionResume::Replayed { events } = session.observe().resume_from_cursor(&cursor)? else {
panic!("recent cursor should replay continue_as observation events");
};
assert!(
events.windows(2).any(|window| matches!(
(&window[0].payload, &window[1].payload),
(
lash_core::SessionObservationEventPayload::AgentFrameSwitched { .. },
lash_core::SessionObservationEventPayload::Committed { .. }
)
)),
"expected AgentFrameSwitched immediately followed by Committed, got {events:?}"
);
Ok(())
}
#[test]
fn durable_agent_frame_follow_through_uses_distinct_turn_scopes_and_commits() -> Result<()> {
run_async_test_on_stack_budget("durable-agent-frame-follow-through-test", || {
durable_agent_frame_follow_through_uses_distinct_turn_scopes_and_commits_inner()
})
}
async fn durable_agent_frame_follow_through_uses_distinct_turn_scopes_and_commits_inner()
-> Result<()> {
let dir = tempfile::tempdir().expect("tempdir");
let session_id = "agent-frame-durable";
let root_turn_id = "agent-frame-root-turn";
let store_factory = Arc::new(lash_sqlite_store::SqliteSessionStoreFactory::new(
dir.path().join("sessions"),
));
let artifact_store = Arc::new(
lash_sqlite_store::Store::open(&dir.path().join("artifacts.db"))
.await
.expect("open artifact store"),
);
let process_registry = Arc::new(
lash_sqlite_store::SqliteProcessRegistry::open(&dir.path().join("processes.db"))
.await
.expect("open process registry"),
);
let host_event_store = Arc::new(
lash_sqlite_store::SqliteHostEventStore::open(&dir.path().join("host-events.db"))
.await
.expect("open host event store"),
);
let controller = Arc::new(RecordingDurableEffectController::default());
let scoped_effect_controller = ScopedEffectController::borrowed(
controller.as_ref(),
lash_core::EffectScope::turn(session_id, root_turn_id),
)
.expect("scoped durable effect controller");
let core = explicit_ephemeral_facets(LashCore::standard())
.provider(agent_frame_switch_provider())
.model(mock_model_spec())
.tools(Arc::new(AgentFrameSwitchTools))
.store_factory(store_factory.clone())
.attachment_store(Arc::new(crate::persistence::FileAttachmentStore::new(
dir.path().join("attachments"),
)))
.lashlang_artifact_store(artifact_store)
.host_event_store(host_event_store)
.process_registry(process_registry)
.build()?;
let session = core.session(session_id).open().await?;
let mut input = TurnInput::text("switch frames");
input.trace_turn_id = Some(root_turn_id.to_string());
let output = session.turn(input).run(scoped_effect_controller).await?;
assert_eq!(output.assistant_message(), Some("done after frame switch"));
let follow_turn_id = format!("{root_turn_id}:agent-frame:1");
let mut llm_turn_ids = controller
.invocations()
.into_iter()
.filter(|record| record.kind == lash_core::RuntimeEffectKind::LlmCall)
.map(|record| record.turn_id.expect("turn-scoped LLM effect"))
.collect::<Vec<_>>();
llm_turn_ids.sort();
llm_turn_ids.dedup();
assert_eq!(
llm_turn_ids,
vec![root_turn_id.to_string(), follow_turn_id.clone()]
);
let replay_keys = controller
.invocations()
.into_iter()
.filter_map(|record| record.replay_key)
.collect::<Vec<_>>();
assert!(
replay_keys.iter().any(|key| key.contains(root_turn_id)),
"root turn replay keys should include {root_turn_id}: {replay_keys:?}"
);
assert!(
replay_keys.iter().any(|key| key.contains(&follow_turn_id)),
"follow turn replay keys should include {follow_turn_id}: {replay_keys:?}"
);
let conn = rusqlite::Connection::open(store_factory.path_for_session(session_id))
.expect("open session sqlite store");
let mut stmt = conn
.prepare("SELECT turn_id FROM runtime_turn_commits ORDER BY turn_id ASC")
.expect("prepare turn commits");
let turn_commit_ids = stmt
.query_map([], |row| row.get::<_, String>(0))
.expect("query turn commits")
.map(|row| row.expect("read turn commit row"))
.collect::<Vec<_>>();
assert_eq!(
turn_commit_ids,
vec![root_turn_id.to_string(), follow_turn_id]
);
Ok(())
}
#[test]
fn process_control_lists_started_lashlang_process_until_awaited() -> Result<()> {
run_async_test_on_stack_budget("process-control-lashlang-process-test", || {
process_control_lists_started_lashlang_process_until_awaited_inner()
})
}
async fn process_control_lists_started_lashlang_process_until_awaited_inner() -> Result<()> {
let (entered_tx, entered_rx) = oneshot::channel();
let (release_tx, release_rx) = oneshot::channel();
let core = explicit_ephemeral_facets(LashCore::rlm())
.provider(queued_text_provider(vec![
r#"```lashlang
process lookup(tools: Tools) {
value = await tools.app_lookup({})?
finish value
}
h = start lookup(tools: tools)
value = await h
submit value
```"#,
]))
.model(mock_model_spec())
.tools(Arc::new(BlockingAppTools::new(entered_tx, release_rx)))
.store_factory(Arc::new(lash_core::InMemorySessionStoreFactory::new()))
.process_registry(Arc::new(TestLocalProcessRegistry::default()))
.build()?;
let session = core.session("rlm-process-control-tool").open().await?;
let turn_session = session.clone();
let scoped_effect_controller = turn_scope(&turn_session.session_id());
let turn = tokio::spawn(async move {
turn_session
.turn(TurnInput::text("start tool"))
.run(scoped_effect_controller)
.await
});
tokio::time::timeout(std::time::Duration::from_secs(5), entered_rx)
.await
.expect("tool process should start")
.expect("tool provider entered");
let processes = session.process_control().list().await?;
let running_app_lookup = processes.iter().any(|process| {
process.descriptor.kind.as_deref() == Some("lashlang")
&& process.descriptor.label.as_deref() == Some("lookup")
&& !process.status.is_terminal()
});
assert!(
running_app_lookup,
"expected running lookup lashlang process, got {processes:?}"
);
release_tx.send(()).expect("release tool provider");
let result = turn.await.expect("turn task")?;
assert_eq!(
result.submitted_value(),
Some(&serde_json::json!({
"ok": true,
"value": { "answer": "ready" },
}))
);
Ok(())
}
#[test]
fn lashlang_execution_graph_store_observes_lashlang_process_from_facade() -> Result<()> {
run_async_test_on_stack_budget("lashlang-graph-store-facade-test", || {
lashlang_execution_graph_store_observes_lashlang_process_from_facade_inner()
})
}
async fn lashlang_execution_graph_store_observes_lashlang_process_from_facade_inner() -> Result<()>
{
let (entered_tx, entered_rx) = oneshot::channel();
let (release_tx, release_rx) = oneshot::channel();
let graph_store = Arc::new(crate::tracing::TraceLashlangGraphStore::default());
let core = explicit_ephemeral_facets(LashCore::rlm())
.provider(queued_text_provider(vec![
r#"```lashlang
process lookup(tools: Tools) {
value = await tools.app_lookup({})?
finish value
}
h = start lookup(tools: tools)
value = await h
submit value
```"#,
]))
.model(mock_model_spec())
.tools(Arc::new(BlockingAppTools::new(entered_tx, release_rx)))
.store_factory(Arc::new(lash_core::InMemorySessionStoreFactory::new()))
.process_registry(Arc::new(TestLocalProcessRegistry::default()))
.lashlang_execution_sink(Some(
Arc::clone(&graph_store) as Arc<dyn crate::tracing::TraceSink>
))
.build()?;
let session = core.session("rlm-lashlang-graph-store").open().await?;
let turn_session = session.clone();
let scoped_effect_controller = turn_scope(&turn_session.session_id());
let turn = tokio::spawn(async move {
turn_session
.turn(TurnInput::text("start tool"))
.run(scoped_effect_controller)
.await
});
tokio::time::timeout(std::time::Duration::from_secs(5), entered_rx)
.await
.expect("tool process should start")
.expect("tool provider entered");
let processes = session.process_control().list().await?;
let running = processes
.iter()
.find(|process| process.descriptor.label.as_deref() == Some("lookup"))
.expect("running lookup process");
let graph = graph_store
.graph(&format!("process:{}", running.process_id))
.expect("Lashlang graph snapshot");
assert_eq!(graph.graph_key, format!("process:{}", running.process_id));
assert_eq!(graph.entry_kind, "process");
assert_eq!(graph.entry_name, "lookup");
assert_eq!(graph.status, lash_core::TraceLashlangStatus::Running);
assert!(!graph.nodes.is_empty());
assert!(
graph_store
.graphs()
.iter()
.any(|graph| graph.entry_name == "lookup")
);
release_tx.send(()).expect("release tool provider");
let _ = turn.await.expect("turn task")?;
Ok(())
}
#[tokio::test]
async fn prose_or_submit_rlm_completion_emits_no_terminal_output() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::rlm())
.provider(queued_text_provider(vec!["done in prose"]))
.model(mock_model_spec())
.build()?;
let session = core.session("rlm-prose-completion").open().await?;
let events = Arc::new(RecordingEvents::default());
let result = session
.turn(TurnInput::text("answer directly"))
.allow_prose_or_submit()?
.stream(events.as_ref(), turn_scope(&session.session_id()))
.await?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { .. })
));
let events = events.snapshot().await;
assert!(!events.iter().any(|event| matches!(
&event.event,
TurnEvent::SubmittedValue { .. } | TurnEvent::ToolValue { .. }
)));
assert_eq!(assistant_prose(&events), "done in prose");
let read_view = result.state.read_view();
let assistant_messages = read_view
.messages()
.iter()
.filter(|message| message.role == lash_core::MessageRole::Assistant)
.collect::<Vec<_>>();
assert_eq!(assistant_messages.len(), 1);
assert_eq!(assistant_messages[0].parts[0].content, "done in prose");
Ok(())
}
#[tokio::test]
async fn submit_required_rlm_completion_emits_terminal_output() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::rlm())
.provider(queued_text_provider(vec![
"```lashlang\nsubmit \"done via submit\"\n```",
]))
.model(mock_model_spec())
.build()?;
let session = core
.session("rlm-submit-required-completion")
.open()
.await?;
let events = Arc::new(RecordingEvents::default());
let result = session
.turn(TurnInput::text("submit"))
.require_submit()?
.stream(events.as_ref(), turn_scope(&session.session_id()))
.await?;
assert!(matches!(
result.outcome,
TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { .. })
));
assert_eq!(
result.submitted_value(),
Some(&serde_json::json!("done via submit"))
);
let events = events.snapshot().await;
let terminal_output = events
.iter()
.find(|event| matches!(&event.event, TurnEvent::SubmittedValue { .. }))
.expect("terminal output");
let TurnEvent::SubmittedValue { value } = &terminal_output.event else {
unreachable!();
};
assert_eq!(value, &serde_json::json!("done via submit"));
Ok(())
}
#[tokio::test]
async fn rlm_failed_code_emits_failed_code_completion_without_fake_tools() -> Result<()> {
let core = explicit_ephemeral_facets(LashCore::rlm())
.provider(queued_text_provider(vec![
"```lashlang\nthis is not valid lashlang\n```",
"```lashlang\nsubmit \"recovered\"\n```",
]))
.model(mock_model_spec())
.tools(Arc::new(AppTools))
.build()?;
let session = core.session("rlm-failed-code-event").open().await?;
let events = RecordingEvents::default();
let _result = session
.turn(TurnInput::text("bad code"))
.stream(&events, turn_scope(&session.session_id()))
.await?;
let events = events.snapshot().await;
let failed = events
.iter()
.position(|event| {
matches!(
&event.event,
TurnEvent::CodeBlockCompleted {
success: false,
error: Some(_),
..
}
)
})
.expect("failed code completion");
let next_code = events[failed + 1..]
.iter()
.position(|event| matches!(&event.event, TurnEvent::CodeBlockStarted { .. }))
.map(|offset| failed + 1 + offset)
.unwrap_or(events.len());
assert!(
!events[failed + 1..next_code]
.iter()
.any(|event| matches!(&event.event, TurnEvent::ToolCallCompleted { .. }))
);
Ok(())
}