use std::collections::BTreeMap;
use std::sync::Arc;
use super::*;
const EXAMPLE_BINDING_KEY: &str = "example.call_path";
#[test]
fn turn_input_round_trips_remote_safe_fields() {
let mut prompt = lash_core::PromptLayer::new();
prompt.add_contribution(lash_core::PromptContribution::guidance("Guide", "remote"));
let mut input = lash_core::TurnInput::items([
lash_core::InputItem::text("a"),
lash_core::InputItem::text("b"),
lash_core::InputItem::image_ref("img"),
])
.with_image_blob("img", vec![1, 2, 3])
.with_protocol_turn_options(lash_core::ProtocolTurnOptions {
payload: serde_json::json!({ "mode": "remote" }),
})
.with_trace_turn_id("trace-1");
input.turn_context.set_prompt_layer(prompt.clone());
let remote = RemoteTurnInput::try_from(input).expect("remote conversion");
assert_eq!(remote.items.len(), 3);
assert_eq!(remote.image_blobs_base64["img"], "AQID");
assert_eq!(remote.trace_turn_id.as_deref(), Some("trace-1"));
assert_eq!(
remote.protocol_turn_options.as_ref().unwrap().payload,
serde_json::json!({ "mode": "remote" })
);
assert_eq!(remote.prompt_layer, Some(prompt.clone().into()));
let core = lash_core::TurnInput::try_from(remote).expect("core conversion");
assert_eq!(core.image_blobs["img"], vec![1, 2, 3]);
assert_eq!(core.trace_turn_id.as_deref(), Some("trace-1"));
assert_eq!(
core.protocol_turn_options.unwrap().payload,
serde_json::json!({ "mode": "remote" })
);
assert_eq!(core.turn_context.prompt_layer(), &prompt);
}
#[test]
fn turn_input_rejects_non_remote_safe_fields() {
struct DummyTurnExtension;
impl lash_core::ProtocolTurnExtension for DummyTurnExtension {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
let mut input = lash_core::TurnInput::text("extension");
input.protocol_extension = Some(lash_core::ProtocolTurnExtensionHandle::new(
DummyTurnExtension,
));
assert!(matches!(
RemoteTurnInput::try_from(input),
Err(RemoteProtocolError::NonRemoteSafeTurnInput(message))
if message.contains("protocol turn")
));
let mut input = lash_core::TurnInput::text("live");
input.turn_context.insert_plugin_input("demo", 1_u32);
assert!(matches!(
RemoteTurnInput::try_from(input),
Err(RemoteProtocolError::NonRemoteSafeTurnInput(message))
if message.contains("live plugin")
));
let mut input = lash_core::TurnInput::text("provider");
input.turn_context.set_provider(
lash_core::testing::TestProvider::builder()
.build()
.into_handle(),
);
assert!(matches!(
RemoteTurnInput::try_from(input),
Err(RemoteProtocolError::NonRemoteSafeTurnInput(message))
if message.contains("provider")
));
let mut input = lash_core::TurnInput::text("model");
input
.turn_context
.set_model(lash_core::ModelSpec::from_token_limits("m", None, 100, None).expect("model"));
assert!(matches!(
RemoteTurnInput::try_from(input),
Err(RemoteProtocolError::NonRemoteSafeTurnInput(message))
if message.contains("model")
));
}
#[test]
fn llm_request_and_response_round_trip_owned_dtos() {
let request = core_llm::LlmRequest {
model: "gpt-test".to_string(),
messages: vec![core_llm::LlmMessage::text(core_llm::LlmRole::User, "hello")],
attachments: vec![core_llm::LlmAttachment::bytes("image/png", vec![1, 2, 3])],
tools: Arc::new(vec![core_llm::LlmToolSpec {
name: "search".to_string(),
description: "Search".to_string(),
input_schema: serde_json::json!({"type": "object"}),
output_schema: serde_json::Value::Null,
input_schema_projections: Vec::new(),
output_schema_projections: Vec::new(),
}]),
tool_choice: core_llm::LlmToolChoice::Auto,
model_variant: Some("fast".to_string()),
generation: core_llm::GenerationOptions {
output_token_cap: NonZeroUsize::new(42),
},
session_id: Some("session-1".to_string()),
output_spec: Some(core_llm::LlmOutputSpec::JsonObject),
stream_events: None,
provider_trace: None,
};
let remote = RemoteLlmRequest::from_core("request-1", request);
remote.validate().expect("valid remote request");
assert_eq!(remote.protocol_version, REMOTE_PROTOCOL_VERSION);
assert_eq!(remote.request_id, "request-1");
let core = core_llm::LlmRequest::try_from(remote).expect("core request");
assert_eq!(core.model, "gpt-test");
assert_eq!(core.model_variant.as_deref(), Some("fast"));
assert_eq!(core.attachments[0].data, vec![1, 2, 3]);
let response = core_llm::LlmResponse {
full_text: "done".to_string(),
parts: vec![core_llm::LlmOutputPart::Text {
text: "done".to_string(),
response_meta: None,
}],
usage: core_llm::LlmUsage {
input_tokens: 1,
output_tokens: 2,
cached_input_tokens: 0,
reasoning_tokens: 0,
},
terminal_reason: core_llm::LlmTerminalReason::Stop,
terminal_diagnostic: Some("ok".to_string()),
provider_usage: Some(serde_json::json!({"provider": "usage"})),
request_body: Some("{}".to_string()),
http_summary: Some("200".to_string()),
};
let remote = RemoteLlmResponse::from_core("request-1", response);
remote.validate().expect("valid remote response");
let core = core_llm::LlmResponse::from(remote);
assert_eq!(core.full_text, "done");
assert_eq!(core.terminal_reason, core_llm::LlmTerminalReason::Stop);
assert_eq!(
core.provider_usage,
Some(serde_json::json!({"provider": "usage"}))
);
}
#[test]
fn prompt_layer_round_trips_without_protocol_crate_depending_on_core_by_default() {
let template = lash_core::PromptTemplate::new(vec![lash_core::PromptTemplateSection::titled(
"Custom",
vec![lash_core::PromptTemplateEntry::slot(
lash_core::PromptSlot::Guidance,
)],
)]);
let prompt = lash_core::PromptLayer::with_template(template)
.with_contribution(lash_core::PromptContribution::guidance("Guide", "remote"));
let remote = RemotePromptLayer::from(prompt.clone());
let core = lash_core::PromptLayer::from(remote);
assert_eq!(core, prompt);
}
#[test]
fn trigger_dtos_round_trip_core_values() {
let request = lash_core::TriggerOccurrenceRequest::new(
"ui.button.pressed",
"source-key",
serde_json::json!({ "button": "Blue" }),
"button-blue-1",
)
.with_source(serde_json::json!({ "id": "blue" }));
let remote = RemoteTriggerOccurrenceRequest::from(request.clone());
remote.validate().expect("valid remote request");
let core = lash_core::TriggerOccurrenceRequest::try_from(remote).expect("core request");
assert_eq!(core, request);
let report = lash_core::TriggerEmitReport {
occurrence_id: "occurrence:1".to_string(),
started_process_ids: vec!["process:1".to_string()],
};
let remote = RemoteTriggerEmitReport::from(report.clone());
remote.validate().expect("valid remote report");
let core = lash_core::TriggerEmitReport::try_from(remote).expect("core report");
assert_eq!(core, report);
let mut filter = lash_core::TriggerSubscriptionFilter::for_source_type("ui.button.pressed");
filter.source_key = Some("source-key".to_string());
filter.enabled = Some(true);
let remote = RemoteTriggerSubscriptionFilter::from(filter.clone());
remote.validate().expect("valid remote filter");
let core = lash_core::TriggerSubscriptionFilter::try_from(remote).expect("core filter");
assert_eq!(core, filter);
let mut inputs = BTreeMap::new();
inputs.insert("event".to_string(), lash_core::TriggerInputBinding::Event);
let registration = lash_core::TriggerRegistration {
handle: "trigger:1".to_string(),
source_key: "source-key".to_string(),
name: Some("button watcher".to_string()),
source_type: lash_core::TriggerEventType::new("ui.button.pressed"),
source: serde_json::json!({}),
target: lash_core::TriggerTargetSummary {
label: Some("on_button".to_string()),
identity: engine_process_identity("on_button"),
input: engine_process_input("on_button", serde_json::json!({})),
inputs,
},
enabled: true,
};
let remote = RemoteTriggerRegistration::from(registration.clone());
let core = lash_core::TriggerRegistration::try_from(remote).expect("core registration");
assert_eq!(
serde_json::to_value(&core).expect("core registration json"),
serde_json::to_value(®istration).expect("registration json")
);
let cause = lash_core::CausalRef::TriggerOccurrence {
occurrence_id: "occurrence:1".to_string(),
};
let remote = RemoteCausalRef::from(cause.clone());
let core = lash_core::CausalRef::from(remote);
assert_eq!(core, cause);
}
#[test]
fn trigger_subscription_dtos_round_trip_core_values() {
let draft = trigger_subscription_draft();
let remote = RemoteTriggerSubscriptionDraft::try_from(draft.clone()).expect("remote draft");
remote.validate().expect("valid remote trigger draft");
let core = lash_core::TriggerSubscriptionDraft::try_from(remote).expect("core draft");
assert_eq!(
serde_json::to_value(&core).expect("core draft json"),
serde_json::to_value(&draft).expect("draft json")
);
let record = trigger_subscription_record();
let remote = RemoteTriggerSubscriptionRecord::try_from(record.clone()).expect("remote record");
remote
.validate("RemoteTriggerSubscriptionRecord")
.expect("valid remote trigger record");
let core = lash_core::TriggerSubscriptionRecord::try_from(remote).expect("core record");
assert_eq!(
serde_json::to_value(&core).expect("core record json"),
serde_json::to_value(&record).expect("record json")
);
let filter = lash_core::TriggerSubscriptionFilter {
session_id: Some("session-a".to_string()),
handle: Some("trigger:1".to_string()),
name: Some("button watcher".to_string()),
source_type: Some("ui.button.pressed".to_string()),
source_key: Some("source-key".to_string()),
target: Some(trigger_target_identity()),
enabled: Some(true),
};
let remote = RemoteTriggerSubscriptionFilter::from(filter.clone());
assert!(remote.target.is_some());
let core = lash_core::TriggerSubscriptionFilter::try_from(remote).expect("core filter");
assert_eq!(
serde_json::to_value(&core).expect("core filter json"),
serde_json::to_value(&filter).expect("filter json")
);
let request = RemoteTriggerRegisterSubscriptionRequest {
protocol_version: REMOTE_PROTOCOL_VERSION,
draft: RemoteTriggerSubscriptionDraft::try_from(draft).expect("remote request draft"),
};
let core = lash_core::TriggerSubscriptionDraft::try_from(request).expect("register request");
assert_eq!(core.source_key, "source-key");
let result =
RemoteTriggerRegisterSubscriptionResult::try_from(record.clone()).expect("remote result");
let core = lash_core::TriggerSubscriptionRecord::try_from(result).expect("register result");
assert_eq!(core.subscription_id, record.subscription_id);
let response =
RemoteTriggerListSubscriptionsResponse::try_from(vec![record.clone()]).expect("response");
let core_records =
Vec::<lash_core::TriggerSubscriptionRecord>::try_from(response).expect("list response");
assert_eq!(core_records.len(), 1);
assert_eq!(
serde_json::to_value(&core_records[0]).expect("core record json"),
serde_json::to_value(&record).expect("record json")
);
let cancel = RemoteTriggerCancelSubscriptionRequest {
protocol_version: REMOTE_PROTOCOL_VERSION,
session_id: "session-a".to_string(),
handle: "trigger:1".to_string(),
};
cancel.validate().expect("valid cancel request");
let cancel_result = RemoteTriggerCancelSubscriptionResult {
protocol_version: REMOTE_PROTOCOL_VERSION,
session_id: cancel.session_id.clone(),
handle: cancel.handle.clone(),
cancelled: true,
};
cancel_result.validate().expect("valid cancel result");
}
#[test]
fn process_start_requests_round_trip_core_values() {
let external = lash_core::ProcessStartRequest::external(
"process:external",
lash_core::ProcessOriginator::host(),
serde_json::json!({ "label": "External" }),
)
.with_wake_target(Some(lash_core::SessionScope::new("session-a")))
.with_grant(Some(lash_core::ProcessStartGrant {
session_scope: lash_core::SessionScope::new("session-a"),
descriptor: lash_core::ProcessHandleDescriptor::new(Some("external"), Some("External")),
}))
.with_event_types([process_event_type()]);
assert_process_start_roundtrip(external);
let lashlang = lash_core::ProcessStartRequest::new(
"process:lashlang",
engine_process_input("main", serde_json::json!({ "event": true })),
lash_core::ProcessOriginator::session(lash_core::SessionScope::new("session-a")),
)
.with_env_spec(lash_core::ProcessExecutionEnvSpec::new(
lash_core::PluginOptions::typed(
"snapshot-tools",
serde_json::json!({ "snapshot_ref": "tool-authority:sha256:abc" }),
)
.expect("plugin options"),
lash_core::SessionPolicy {
provider_id: "process-provider".to_string(),
model: lash_core::ModelSpec::from_token_limits("process-model", None, 4096, Some(512))
.expect("model"),
..Default::default()
},
))
.with_event_types([process_event_type()]);
assert_process_start_roundtrip(lashlang);
let session_turn = lash_core::ProcessStartRequest::new(
"process:session-turn",
lash_core::ProcessInput::SessionTurn {
create_request: Box::new(
lash_core::SessionCreateRequest::child_session(
"session-a",
lash_core::SessionStartPoint::Empty,
Default::default(),
)
.with_session_id("child-session"),
),
turn_input: Box::new(lash_core::TurnInput::text("hello child")),
output_contract: lash_core::ToolOutputContract::from_input_schema(
"schema",
Some(serde_json::json!({ "type": "object" })),
),
},
lash_core::ProcessOriginator::host(),
);
assert_process_start_roundtrip(session_turn);
}
#[test]
fn process_records_events_snapshots_and_results_round_trip_core_values() {
let mut record = process_record("process:record");
record.status = lash_core::ProcessStatus::Completed {
await_output: lash_core::ProcessAwaitOutput::Success {
value: serde_json::json!({ "done": true }),
control: None,
},
};
let remote = RemoteProcessRecord::try_from(record.clone()).expect("remote record");
remote
.validate("RemoteProcessRecord")
.expect("valid remote record");
let core = lash_core::ProcessRecord::try_from(remote).expect("core record");
assert_eq!(core.id, record.id);
assert_eq!(core.status.label(), record.status.label());
assert_eq!(
serde_json::to_value(core.input.as_ref()).expect("core input json"),
serde_json::to_value(record.input.as_ref()).expect("record input json")
);
let summary = lash_core::ProcessHandleSummary::new(
"process:record",
lash_core::ProcessHandleDescriptor::new(Some("external"), Some("External")),
lash_core::ProcessLifecycleStatus::Completed,
)
.with_definition(Some(process_definition_identity("main")));
let remote = RemoteProcessSummary::from(summary.clone());
remote
.validate("RemoteProcessSummary")
.expect("valid summary");
let core = lash_core::ProcessHandleSummary::try_from(remote).expect("core summary");
assert_eq!(core.process_id, summary.process_id);
assert_eq!(core.status, summary.status);
let event = process_event("process:record");
let remote = RemoteProcessEvent::from(event.clone());
remote
.validate("RemoteProcessEvent")
.expect("valid process event");
let core = lash_core::ProcessEvent::try_from(remote).expect("core event");
assert_eq!(core.process_id, event.process_id);
assert_eq!(core.event_type, event.event_type);
assert_eq!(
serde_json::to_value(&core.semantics).expect("core semantics json"),
serde_json::to_value(&event.semantics).expect("event semantics json")
);
let observed = observed_process();
let response =
RemoteProcessListResponse::try_from(vec![observed.clone()]).expect("list response");
response.validate().expect("valid list response");
let core_observed =
Vec::<lash_core::ObservedProcess>::try_from(response).expect("core observed");
assert_eq!(core_observed[0].process_id, observed.process_id);
let snapshot = lash_core::ProcessWorkSnapshot {
session_id: "session-a".to_string(),
visible_process_ids: vec!["process:observed".to_string()],
items: vec![lash_core::ObservedWorkItem {
process: observed,
descriptor: lash_core::ProcessHandleDescriptor::new(Some("external"), Some("External")),
events: vec![lash_core::ObservedProcessEvent {
sequence: 1,
event_type: "process.yield".to_string(),
occurred_at_ms: 12,
payload: serde_json::json!({ "text": "hi" }),
}],
kind: "external".to_string(),
label: "External".to_string(),
}],
};
let remote = RemoteProcessWorkSnapshot::try_from(snapshot.clone()).expect("remote snapshot");
remote.validate().expect("valid snapshot");
let core = lash_core::ProcessWorkSnapshot::try_from(remote).expect("core snapshot");
assert_eq!(core.session_id, snapshot.session_id);
assert_eq!(core.items[0].process.process_id, "process:observed");
let start_result = RemoteProcessStartResult::try_from(process_record("process:start-result"))
.expect("start result");
let core = lash_core::ProcessRecord::try_from(start_result).expect("core start result");
assert_eq!(core.id, "process:start-result");
let cancel = RemoteProcessCancelResult::from(lash_core::ProcessCancelSummary {
process_id: "process:cancel".to_string(),
status: lash_core::ProcessLifecycleStatus::Cancelled,
});
let core = lash_core::ProcessCancelSummary::try_from(cancel).expect("core cancel summary");
assert_eq!(core.status, lash_core::ProcessLifecycleStatus::Cancelled);
let await_result = RemoteProcessAwaitResult::from((
"process:await".to_string(),
lash_core::ProcessAwaitOutput::Cancelled {
message: "stopped".to_string(),
raw: None,
control: None,
},
));
let (process_id, output) =
<(String, lash_core::ProcessAwaitOutput)>::try_from(await_result).expect("await result");
assert_eq!(process_id, "process:await");
assert!(matches!(
output,
lash_core::ProcessAwaitOutput::Cancelled { .. }
));
let events_response =
RemoteProcessEventsResponse::from(("process:record".to_string(), vec![event]));
let (process_id, events) = <(String, Vec<lash_core::ProcessEvent>)>::try_from(events_response)
.expect("events response");
assert_eq!(process_id, "process:record");
assert_eq!(events.len(), 1);
}
#[test]
fn process_list_cancel_signal_and_await_requests_convert_to_core_commands() {
let filter = lash_core::ProcessListFilter {
definition: Some(process_definition_identity("main")),
status: lash_core::ProcessStatusFilter::Any,
waiting: Some(true),
};
let remote = RemoteProcessListFilter::from(filter.clone());
remote.validate().expect("valid list filter");
let core = lash_core::ProcessListFilter::try_from(remote).expect("core filter");
assert_eq!(core.status, filter.status);
assert_eq!(core.waiting, filter.waiting);
assert!(core.definition.is_some());
let cancel = RemoteProcessCancelRequest {
protocol_version: REMOTE_PROTOCOL_VERSION,
process_id: "process:cancel".to_string(),
reason: Some("host requested".to_string()),
};
cancel.validate().expect("valid cancel");
let command = lash_core::ProcessCommand::from(cancel);
assert!(matches!(
command,
lash_core::ProcessCommand::Cancel { process_id, .. } if process_id == "process:cancel"
));
let signal = RemoteProcessSignalRequest {
protocol_version: REMOTE_PROTOCOL_VERSION,
process_id: "process:signal".to_string(),
signal_name: "ready".to_string(),
signal_id: "signal:1".to_string(),
payload: serde_json::json!({ "ok": true }),
replay_key: Some("signal-replay".to_string()),
wake_target_scope: Some(lash_core::SessionScope::new("session-a").into()),
};
let append =
lash_core::ProcessEventAppendRequest::try_from(signal.clone()).expect("append request");
assert_eq!(append.event_type, "signal.ready");
let command = lash_core::ProcessCommand::try_from(signal).expect("signal command");
assert!(matches!(
command,
lash_core::ProcessCommand::Signal { process_id, signal_name, signal_id, .. }
if process_id == "process:signal"
&& signal_name == "ready"
&& signal_id == "signal:1"
));
let await_request = RemoteProcessAwaitRequest {
protocol_version: REMOTE_PROTOCOL_VERSION,
process_id: "process:await".to_string(),
};
await_request.validate().expect("valid await");
let command = lash_core::ProcessCommand::from(await_request);
assert!(matches!(
command,
lash_core::ProcessCommand::Await { process_id } if process_id == "process:await"
));
}
#[test]
fn remote_turn_result_maps_core_semantics() {
let turn = lash_core::AssembledTurn {
state: Default::default(),
outcome: lash_core::TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage {
text: "done".to_string(),
}),
assistant_output: lash_core::AssistantOutput {
safe_text: "done".to_string(),
raw_text: "done".to_string(),
state: lash_core::OutputState::Usable,
},
execution: lash_core::ExecutionSummary {
had_tool_calls: true,
had_code_execution: false,
},
token_usage: lash_core::TokenUsage {
input_tokens: 1,
output_tokens: 2,
cached_input_tokens: 0,
reasoning_tokens: 0,
},
children_usage: vec![lash_core::TokenLedgerEntry {
source: "subagent".to_string(),
model: "m".to_string(),
usage: lash_core::TokenUsage {
input_tokens: 3,
output_tokens: 4,
cached_input_tokens: 0,
reasoning_tokens: 0,
},
}],
tool_calls: Vec::new(),
errors: Vec::new(),
};
let remote = RemoteTurnResult::from_core("session", "turn", turn, []);
remote.validate().expect("valid turn result");
assert_eq!(remote.status, RemoteTurnStatus::Completed);
assert_eq!(remote.usage.total.input_tokens, 4);
assert_eq!(remote.usage.total.output_tokens, 6);
}
#[test]
fn remote_tool_grants_validate_explicit_bindings_and_duplicates() {
let grant = demo_grant("one", "tools", "search");
grant.validate().expect("valid grant");
assert_eq!(
grant.binding_call_path(EXAMPLE_BINDING_KEY).unwrap(),
"tools.search"
);
let mut missing_binding = grant.clone();
missing_binding.bindings.remove(EXAMPLE_BINDING_KEY);
assert!(matches!(
missing_binding.binding_call_path(EXAMPLE_BINDING_KEY),
Err(RemoteProtocolError::MissingToolBinding { .. })
));
let duplicate = demo_grant("two", "tools", "search");
assert!(matches!(
RemoteToolGrant::validate_all(&[grant, duplicate]),
Err(RemoteProtocolError::DuplicateRemoteCallPath { .. })
));
}
#[test]
fn remote_tool_grants_convert_to_core_ids_without_binding_call_path() {
let grant = demo_grant("one", "tools", "search");
let definition = lash_core::ToolDefinition::try_from(&grant).expect("tool definition");
assert_eq!(definition.manifest().id.as_str(), "remote-tool:one");
assert_eq!(
definition.manifest().bindings[EXAMPLE_BINDING_KEY],
grant.bindings[EXAMPLE_BINDING_KEY],
"remote bindings remain opaque metadata on the manifest"
);
let changed_binding = demo_grant("one", "other_module", "other_operation");
assert_eq!(
changed_binding
.binding_call_path(EXAMPLE_BINDING_KEY)
.expect("changed binding call path"),
"other_module.other_operation"
);
let definition =
lash_core::ToolDefinition::try_from(&changed_binding).expect("tool definition");
assert_eq!(
definition.manifest().id.as_str(),
"remote-tool:one",
"derived core IDs are based on remote grant names, not binding call paths"
);
assert_ne!(
definition.manifest().id.as_str(),
"remote-tool:other_module.other_operation"
);
let mut explicit = grant;
explicit.id = Some("remote-tool:explicit".to_string());
let definition = lash_core::ToolDefinition::try_from(&explicit).expect("tool definition");
assert_eq!(definition.manifest().id.as_str(), "remote-tool:explicit");
}
#[test]
fn remote_activity_preserves_semantic_fields_and_collapses_runtime_diagnostics() {
let output = lash_core::ToolCallOutput::success(serde_json::json!({ "ok": true }));
let activity = lash_core::TurnActivity::new(
lash_core::TurnActivityId::new("corr"),
lash_core::TurnEvent::ToolCallCompleted {
call_id: Some("call".to_string()),
name: "demo".to_string(),
args: serde_json::json!({ "a": 1 }),
output,
duration_ms: 42,
},
);
let remote = RemoteTurnActivity::from_core(9, activity);
assert_eq!(remote.sequence, 9);
match remote.event {
RemoteTurnEvent::ToolCallCompleted {
call_id,
args,
duration_ms,
..
} => {
assert_eq!(call_id.as_deref(), Some("call"));
assert_eq!(args, serde_json::json!({ "a": 1 }));
assert_eq!(duration_ms, 42);
}
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn remote_session_observation_from_core_maps_all_payload_variants() {
fn event(
payload: lash_core::SessionObservationEventPayload,
) -> lash_core::SessionObservationEvent {
let store = lash_core::InMemoryLiveReplayStore::default();
lash_core::LiveReplayStore::append(
&store,
"session",
lash_core::SessionRevision::new(4),
payload,
)
.expect("append observation event")
}
let activity =
lash_core::TurnActivity::independent(lash_core::TurnEvent::AssistantProseDelta {
text: "delta".to_string(),
});
let remote = RemoteSessionObservationEvent::from_core(
7,
event(lash_core::SessionObservationEventPayload::TurnActivity(
activity,
)),
);
match remote.event {
RemoteSessionObservationEventPayload::TurnActivity { activity } => {
assert_eq!(activity.sequence, 7);
}
other => panic!("unexpected payload: {other:?}"),
}
let read_view =
lash_core::SessionReadView::from_snapshot(&lash_core::SessionSnapshot::default());
let remote = RemoteSessionObservationEvent::from_core(
8,
event(lash_core::SessionObservationEventPayload::Committed { read_view }),
);
assert!(matches!(
remote.event,
RemoteSessionObservationEventPayload::Committed
));
let remote = RemoteSessionObservationEvent::from_core(
9,
event(
lash_core::SessionObservationEventPayload::AgentFrameSwitched {
frame_id: "frame-1".to_string(),
},
),
);
assert!(matches!(
remote.event,
RemoteSessionObservationEventPayload::AgentFrameSwitched { frame_id }
if frame_id == "frame-1"
));
let remote = RemoteSessionObservationEvent::from_core(
10,
event(lash_core::SessionObservationEventPayload::QueueChanged {
kind: lash_core::SessionQueueEventKind::Cancelled,
batch_ids: vec!["batch-1".to_string()],
}),
);
assert!(matches!(
remote.event,
RemoteSessionObservationEventPayload::QueueChanged { kind, batch_ids }
if kind == RemoteSessionQueueEventKind::Cancelled
&& batch_ids == vec!["batch-1".to_string()]
));
let remote = RemoteSessionObservationEvent::from_core(
11,
event(lash_core::SessionObservationEventPayload::ProcessChanged {
kind: lash_core::SessionProcessEventKind::Started,
process_ids: vec!["process-1".to_string()],
}),
);
assert!(matches!(
remote.event,
RemoteSessionObservationEventPayload::ProcessChanged { kind, process_ids }
if kind == RemoteSessionProcessEventKind::Started
&& process_ids == vec!["process-1".to_string()]
));
}
fn demo_grant(name: &str, module: &str, operation: &str) -> RemoteToolGrant {
RemoteToolGrant {
protocol_version: REMOTE_PROTOCOL_VERSION,
id: None,
name: name.to_string(),
description: "demo".to_string(),
input_schema: default_input_schema(),
output_schema: serde_json::Value::Null,
input_schema_projections: Vec::new(),
output_schema_projections: Vec::new(),
output_contract: RemoteToolOutputContract::Static,
examples: Vec::new(),
availability: None,
activation: None,
argument_projection: None,
scheduling: None,
retry_policy: None,
bindings: BTreeMap::from([(
EXAMPLE_BINDING_KEY.to_string(),
serde_json::json!({
"module_path": [module],
"operation": operation
}),
)]),
}
}
fn process_definition_identity(process_name: &str) -> serde_json::Value {
serde_json::json!({
"module_ref": "lashlang:v1:sha256:module",
"host_requirements_ref": "lashlang-host-requirements:v1:sha256:host",
"process_ref": {
"component": "process-component",
"pos": 1
},
"process_name": process_name
})
}
fn engine_process_input(process_name: &str, args: serde_json::Value) -> lash_core::ProcessInput {
let _ = process_name;
lash_core::ProcessInput::Engine {
kind: "lashlang".to_string(),
payload: serde_json::json!({
"args": args
}),
}
}
fn engine_process_identity(process_name: &str) -> lash_core::ProcessIdentity {
lash_core::ProcessIdentity::new("lashlang")
.with_label(Some(process_name.to_string()))
.with_definition(Some(process_definition_identity(process_name)))
}
fn trigger_target_identity() -> serde_json::Value {
process_definition_identity("on_button")
}
fn trigger_input_template() -> BTreeMap<String, lash_core::TriggerInputBinding> {
BTreeMap::from([
("event".to_string(), lash_core::TriggerInputBinding::Event),
(
"fixed".to_string(),
lash_core::TriggerInputBinding::Fixed {
value: serde_json::json!("blue"),
},
),
])
}
fn trigger_subscription_draft() -> lash_core::TriggerSubscriptionDraft {
lash_core::TriggerSubscriptionDraft {
registrant: lash_core::ProcessOriginator::session(lash_core::SessionScope::new(
"session-a",
)),
env_ref: lash_core::ProcessExecutionEnvRef::new(
"process-env:sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
),
wake_target: Some(lash_core::SessionScope::new("session-a")),
name: Some("button watcher".to_string()),
source_type: "ui.button.pressed".to_string(),
source_key: "source-key".to_string(),
source: serde_json::json!({ "button": "blue" }),
payload_schema: lash_core::LashSchema::any(),
target: engine_process_input("on_button", serde_json::json!({})),
target_identity: engine_process_identity("on_button"),
event_types: vec![process_event_type()],
input_template: trigger_input_template(),
target_label: Some("on_button".to_string()),
}
}
fn trigger_subscription_record() -> lash_core::TriggerSubscriptionRecord {
let draft = trigger_subscription_draft();
lash_core::TriggerSubscriptionRecord {
subscription_id: "subscription:1".to_string(),
registrant: draft.registrant,
env_ref: draft.env_ref,
wake_target: draft.wake_target,
handle: "trigger:1".to_string(),
name: draft.name,
source_type: draft.source_type,
source_key: draft.source_key,
source: draft.source,
payload_schema: draft.payload_schema,
target: draft.target,
target_identity: draft.target_identity,
event_types: draft.event_types,
input_template: draft.input_template,
target_label: draft.target_label,
enabled: true,
created_at_ms: 1,
updated_at_ms: 2,
}
}
fn assert_process_start_roundtrip(request: lash_core::ProcessStartRequest) {
let before = serde_json::to_value(&request).expect("request json");
let remote = RemoteProcessStartRequest::try_from(request).expect("remote start");
remote.validate().expect("valid remote start");
let core = lash_core::ProcessStartRequest::try_from(remote).expect("core start");
assert_eq!(serde_json::to_value(&core).expect("core json"), before);
}
fn process_event_type() -> lash_core::ProcessEventType {
lash_core::ProcessEventType {
name: "process.completed".to_string(),
payload_schema: lash_core::LashSchema::any(),
semantics: lash_core::ProcessEventSemanticsSpec {
terminal: Some(lash_core::ProcessTerminalSpec {
state: lash_core::ProcessTerminalState::Completed,
await_output: Some(lash_core::ProcessValueSelector::Pointer(
"/await_output".to_string(),
)),
}),
wake: Some(lash_core::ProcessWakeSpec {
when: None,
input: lash_core::ProcessValueSelector::Pointer("/text".to_string()),
dedupe_key: lash_core::ProcessWakeDedupeKey::Const("done".to_string()),
}),
},
}
}
fn process_record(process_id: &str) -> lash_core::ProcessRecord {
let registration = lash_core::ProcessRegistration::new(
process_id,
lash_core::ProcessInput::External {
metadata: serde_json::json!({ "label": "External" }),
},
lash_core::ProcessProvenance::host().with_caused_by(Some(
lash_core::CausalRef::TriggerOccurrence {
occurrence_id: "trigger:1".to_string(),
},
)),
)
.with_event_types([process_event_type()])
.with_wake_target(Some(lash_core::SessionScope::new("session-a")));
let mut record = lash_core::ProcessRecord::from_registration(registration);
record.external_ref = Some(lash_core::ProcessExternalRef {
backend: "worker".to_string(),
id: "external:1".to_string(),
metadata: Some(serde_json::json!({ "queue": "default" })),
});
record.wait = Some(lash_core::WaitState {
kind: lash_core::WaitKind::Signal {
name: "ready".to_string(),
event_type: "signal.ready".to_string(),
key: format!("process:{process_id}:signal.ready:1"),
ordinal: 1,
},
since_ms: 10,
});
record
}
fn process_event(process_id: &str) -> lash_core::ProcessEvent {
lash_core::ProcessEvent {
process_id: process_id.to_string(),
sequence: 1,
event_type: "process.completed".to_string(),
payload: serde_json::json!({ "await_output": { "type": "success", "value": true } }),
invocation: lash_core::RuntimeInvocation::effect(
lash_core::runtime::RuntimeScope::for_turn("session-a", "turn-a", 1, 0),
"effect:1",
lash_core::RuntimeEffectKind::Process,
"replay:1",
)
.with_caused_by(Some(lash_core::CausalRef::Process {
process_id: process_id.to_string(),
})),
semantics: lash_core::runtime::ProcessEventSemantics {
terminal: Some(lash_core::ProcessTerminalSemantics {
state: lash_core::ProcessTerminalState::Completed,
await_output: lash_core::ProcessAwaitOutput::Success {
value: serde_json::json!(true),
control: None,
},
}),
wake: Some(lash_core::ProcessWake {
input: "wake".to_string(),
dedupe_key: "dedupe".to_string(),
}),
},
occurred_at: lash_core::system_time_from_epoch_ms(12),
}
}
fn observed_process() -> lash_core::ObservedProcess {
lash_core::ObservedProcess {
process_id: "process:observed".to_string(),
graph_key: "process:process:observed".to_string(),
kind: "external".to_string(),
identity: lash_core::ProcessIdentity::new("external")
.with_label(Some("External".to_string())),
lifecycle: lash_core::ProcessLifecycleStatus::Running,
status_label: "running".to_string(),
terminal: false,
error: None,
created_at_ms: 1,
updated_at_ms: 2,
input: lash_core::ProcessInput::External {
metadata: serde_json::json!({ "label": "External" }),
},
originator: lash_core::ProcessOriginator::host(),
env_ref: None,
wake_target: Some(lash_core::SessionScope::new("session-a")),
caused_by: None,
external_ref: None,
wait: None,
child_session_id: None,
label: "External".to_string(),
}
}