pub(in super::super) use roboticus_pipeline::run::run_pipeline;
#[cfg(test)]
mod tests {
use roboticus_pipeline::config::*;
use roboticus_pipeline::context::inference::annotate_mcp_calls_from_react_trace;
use roboticus_pipeline::decomposition::{
DecompositionDecision, DelegationPlan, SpecialistProposal,
};
use roboticus_pipeline::flight_recorder::{ReactStep, ReactTrace, ToolSource};
use roboticus_pipeline::stage_deps::TaskStateDeps;
use roboticus_pipeline::task_state::{
build_task_state_input, resolve_specialist_creation_for_task,
};
use roboticus_pipeline::trace::{PipelineTrace, SpanOutcome};
#[test]
fn api_preset_enables_all_core_features() {
let cfg = PipelineConfig::api();
assert!(cfg.injection_defense);
assert!(cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
assert!(cfg.shortcuts_enabled);
assert!(cfg.cache_enabled);
assert!(cfg.post_turn_ingest);
assert!(cfg.nickname_refinement);
assert!(cfg.inject_diagnostics);
assert!(!cfg.specialist_controls); assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
assert_eq!(cfg.authority_mode, AuthorityMode::ApiClaim);
assert_eq!(cfg.channel_label, "api");
assert_eq!(cfg.session_resolution, SessionResolutionMode::FromBody);
}
#[tokio::test]
async fn auto_composition_creates_missing_skills_and_subagent_for_task_turn() {
let state = crate::api::routes::tests::test_state();
{
let mut cfg = state.config.write().await;
cfg.agent.composition_policy = roboticus_core::config::CompositionPolicy::Autonomous;
}
let proposal = SpecialistProposal {
name: "finance-specialist".into(),
display_name: "Finance Specialist".into(),
description: "Handles finance planning tasks".into(),
skills: vec!["forecasting".into(), "pricing".into()],
model: "auto".into(),
};
let decision = DecompositionDecision::RequiresSpecialistCreation {
proposal,
rationale: "no specialist fit".into(),
};
let mut trace = PipelineTrace::new("turn-auto-compose", "api");
let am_deps = roboticus_pipeline::stage_deps::ActionMappingDeps {
core: &state,
reasoning: &state,
tooling: &state,
tool_executor: &state,
};
let (resolved, workflow_note) = resolve_specialist_creation_for_task(
&am_deps,
"turn-auto-compose",
"api",
roboticus_core::InputAuthority::Creator,
"session-auto-compose",
"Research pricing and forecast revenue and draft a rollout plan",
decision,
&mut trace,
)
.await;
assert!(!matches!(
resolved,
DecompositionDecision::RequiresSpecialistCreation { .. }
));
assert!(
workflow_note.is_some()
|| matches!(resolved, DecompositionDecision::Centralized { .. })
);
let skills = roboticus_db::skills::list_skills(&state.db).expect("list skills");
assert!(skills.iter().any(|s| s.name == "forecasting"));
assert!(skills.iter().any(|s| s.name == "pricing"));
let agents = roboticus_db::agents::list_sub_agents(&state.db).expect("list subagents");
let subagent = agents
.iter()
.find(|a| a.name == "finance-specialist")
.expect("finance specialist created");
let skills_json = subagent.skills_json.as_deref().unwrap_or("[]");
assert!(skills_json.contains("forecasting"));
assert!(skills_json.contains("pricing"));
}
#[test]
fn planner_selects_compose_subagent_for_explicit_empty_roster_workflow() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::{TaskClassification, TaskStateInput};
let input = TaskStateInput {
user_content:
"Introspect what you need, compose a specialist, then delegate this task.".into(),
intents: vec!["Delegation".into()],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 0,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: None,
explicit_specialist_workflow: true,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(state.classification, TaskClassification::Task);
assert_eq!(state.roster_fit.taskable_count, 0);
assert_eq!(plan.selected, PlannedAction::ComposeSubagent);
}
#[test]
fn planner_selects_answer_directly_for_conversation() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::{TaskClassification, TaskStateInput};
let input = TaskStateInput {
user_content: "Thanks, that makes sense.".into(),
intents: vec![],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 0,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: None,
explicit_specialist_workflow: false,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(state.classification, TaskClassification::Conversation);
assert_eq!(plan.selected, PlannedAction::AnswerDirectly);
}
#[test]
fn build_task_state_input_compose_subagent_with_centralized_gate() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskStateInput;
let input = TaskStateInput {
user_content: "Compose a specialist and delegate this task".into(),
intents: vec!["Delegation".into()],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 10,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: Some(roboticus_agent::task_state::DecompositionProposal {
should_delegate: false,
rationale: "single-step".into(),
utility_margin: -0.1,
}),
explicit_specialist_workflow: true,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(plan.selected, PlannedAction::ComposeSubagent);
}
#[test]
fn build_task_state_input_conversation_is_non_compositional() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskStateInput;
let input = TaskStateInput {
user_content: "Thanks".into(),
intents: vec![],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 0,
fit_agent_count: 0,
fit_agent_names: vec![],
enabled_skill_count: 10,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: None,
explicit_specialist_workflow: false,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert_eq!(plan.selected, PlannedAction::AnswerDirectly);
}
#[tokio::test]
async fn build_task_state_input_prefers_delegation_for_explicit_matching_specialists() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskClassification;
let state = crate::api::routes::tests::test_state();
let agent = roboticus_db::agents::SubAgentRow {
id: "test-revenue-strategist".into(),
name: "revenue-strategist".into(),
display_name: Some("Revenue Strategist".into()),
model: "auto".into(),
fallback_models_json: Some("[]".into()),
role: "Subagent".into(),
description: Some("Handles freelancer revenue strategy".into()),
skills_json: Some(r#"["revenue","pricing","freelancers"]"#.into()),
enabled: true,
session_count: 0,
last_used_at: None,
};
roboticus_db::agents::upsert_sub_agent(&state.db, &agent).expect("seed subagent");
let intents = vec![roboticus_pipeline::intent_registry::Intent::Delegation];
let gate_decision = DecompositionDecision::Delegated(DelegationPlan {
subtasks: vec!["test task".into()],
rationale: "test delegation".into(),
expected_utility_margin: 0.5,
});
let task_deps = TaskStateDeps {
core: &state,
reasoning: &state,
tooling: &state,
};
let task_input = build_task_state_input(
&task_deps,
"test-session",
"revenue pricing",
&intents,
roboticus_core::InputAuthority::Creator,
Some(&gate_decision),
"standard",
)
.await;
let task_state = roboticus_agent::task_state::synthesize(&task_input);
let plan = roboticus_agent::action_planner::plan(&task_state, &task_input);
assert_eq!(task_state.classification, TaskClassification::Task);
assert_eq!(task_state.roster_fit.taskable_count, 1);
assert_eq!(task_state.roster_fit.fit_count, 1);
assert!(
matches!(
plan.selected,
PlannedAction::DelegateToSpecialist | PlannedAction::ComposeSkill
),
"planner should act on roster fit: got {:?}",
plan.selected
);
}
#[tokio::test]
async fn build_task_state_input_counts_name_and_description_tokens_for_specialist_fit() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::TaskClassification;
let state = crate::api::routes::tests::test_state();
let agent = roboticus_db::agents::SubAgentRow {
id: "test-saas-ideator".into(),
name: "saas-ideator".into(),
display_name: Some("SaaS Ideator".into()),
model: "auto".into(),
fallback_models_json: Some("[]".into()),
role: "Subagent".into(),
description: Some("Generates SaaS ideas for freelancers".into()),
skills_json: Some(r#"["typescript","python"]"#.into()),
enabled: true,
session_count: 0,
last_used_at: None,
};
roboticus_db::agents::upsert_sub_agent(&state.db, &agent).expect("seed subagent");
let intents = vec![roboticus_pipeline::intent_registry::Intent::Delegation];
let gate_decision = DecompositionDecision::Delegated(DelegationPlan {
subtasks: vec!["test task".into()],
rationale: "test delegation".into(),
expected_utility_margin: 0.5,
});
let task_deps = TaskStateDeps {
core: &state,
reasoning: &state,
tooling: &state,
};
let task_input = build_task_state_input(
&task_deps,
"test-session",
"saas ideas freelancers",
&intents,
roboticus_core::InputAuthority::Creator,
Some(&gate_decision),
"standard",
)
.await;
let task_state = roboticus_agent::task_state::synthesize(&task_input);
let plan = roboticus_agent::action_planner::plan(&task_state, &task_input);
assert_eq!(task_state.classification, TaskClassification::Task);
assert_eq!(task_state.roster_fit.taskable_count, 1);
assert_eq!(task_state.roster_fit.fit_count, 1);
assert!(
matches!(
plan.selected,
PlannedAction::DelegateToSpecialist | PlannedAction::ComposeSkill
),
"planner should act on roster fit: got {:?}",
plan.selected
);
}
#[tokio::test]
async fn build_task_state_input_treats_explicit_specialist_workflow_as_task() {
use roboticus_agent::task_state::TaskClassification;
let state = crate::api::routes::tests::test_state();
let intents = vec![roboticus_pipeline::intent_registry::Intent::Delegation];
let task_deps = TaskStateDeps {
core: &state,
reasoning: &state,
tooling: &state,
};
let task_input = build_task_state_input(
&task_deps,
"test-session",
"delegate this to a specialist",
&intents,
roboticus_core::InputAuthority::Creator,
None,
"standard",
)
.await;
let task_state = roboticus_agent::task_state::synthesize(&task_input);
assert_eq!(task_state.classification, TaskClassification::Task);
}
#[test]
fn build_task_state_input_delegates_when_explicit_workflow_and_fit_agents() {
use roboticus_agent::action_planner::PlannedAction;
use roboticus_agent::task_state::{DecompositionProposal, TaskStateInput};
let input = TaskStateInput {
user_content:
"Use the best existing specialists if they fit, otherwise compose what is missing."
.into(),
intents: vec!["Delegation".into()],
authority: "Creator".into(),
retrieval_metrics: None,
tool_search_stats: None,
mcp_tools_available: false,
taskable_agent_count: 2,
fit_agent_count: 1,
fit_agent_names: vec!["revenue-strategist".into()],
enabled_skill_count: 10,
matching_skill_count: 0,
missing_skills: vec![],
remaining_budget_tokens: 8000,
provider_breaker_open: false,
inference_mode: "standard".into(),
decomposition_proposal: Some(DecompositionProposal {
should_delegate: false,
rationale: "single-step".into(),
utility_margin: -0.1,
}),
explicit_specialist_workflow: true,
named_tool_match: false,
recent_response_skeletons: vec![],
recent_user_message_lengths: vec![],
self_echo_fragments: vec![],
declared_action: None,
previous_turn_had_protocol_issues: false,
normalization_retry_streak: 0,
};
let state = roboticus_agent::task_state::synthesize(&input);
let plan = roboticus_agent::action_planner::plan(&state, &input);
assert!(
matches!(
plan.selected,
PlannedAction::DelegateToSpecialist | PlannedAction::ComposeSkill
),
"planner should act on roster fit: got {:?}",
plan.selected
);
}
#[test]
fn streaming_preset_has_full_feature_parity() {
let cfg = PipelineConfig::streaming();
assert!(cfg.injection_defense);
assert!(cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
assert!(cfg.shortcuts_enabled);
assert!(!cfg.specialist_controls);
assert_eq!(cfg.inference_mode, InferenceMode::Streaming);
assert_eq!(cfg.guard_set, GuardSetPreset::Streaming);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::None);
assert!(!cfg.nickname_refinement);
assert!(cfg.post_turn_ingest);
assert!(cfg.cache_enabled);
assert_eq!(cfg.authority_mode, AuthorityMode::ApiClaim);
assert_eq!(cfg.channel_label, "api-stream");
}
#[test]
fn channel_preset_enables_specialist_controls() {
let cfg = PipelineConfig::channel("telegram");
assert!(cfg.injection_defense);
assert!(cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
assert!(cfg.shortcuts_enabled);
assert!(cfg.specialist_controls); assert!(cfg.cache_enabled);
assert!(cfg.post_turn_ingest);
assert!(!cfg.nickname_refinement); assert!(!cfg.inject_diagnostics); assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
assert_eq!(cfg.authority_mode, AuthorityMode::ChannelClaim);
assert_eq!(cfg.channel_label, "telegram");
assert_eq!(
cfg.session_resolution,
SessionResolutionMode::FromChannel {
platform: "telegram".into()
}
);
}
#[test]
fn channel_preset_uses_platform_as_label() {
let telegram = PipelineConfig::channel("telegram");
assert_eq!(telegram.channel_label, "telegram");
let discord = PipelineConfig::channel("discord");
assert_eq!(discord.channel_label, "discord");
let email = PipelineConfig::channel("email");
assert_eq!(email.channel_label, "email");
}
#[test]
fn cron_preset_has_injection_defense() {
let cfg = PipelineConfig::cron();
assert!(cfg.injection_defense);
assert!(!cfg.dedup_tracking);
assert!(cfg.decomposition_gate);
assert!(!cfg.delegated_execution); assert!(!cfg.shortcuts_enabled); assert!(!cfg.specialist_controls);
assert!(cfg.cache_enabled);
assert!(cfg.post_turn_ingest);
assert!(!cfg.nickname_refinement);
assert!(!cfg.inject_diagnostics);
assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
assert_eq!(cfg.authority_mode, AuthorityMode::SelfGenerated);
assert_eq!(cfg.channel_label, "cron");
assert_eq!(cfg.session_resolution, SessionResolutionMode::Dedicated);
}
#[test]
fn mcp_react_steps_annotate_pipeline_trace() {
let mut pipeline_trace = PipelineTrace::new("turn-mcp", "api");
pipeline_trace.begin_stage("inference");
let mut react_trace = ReactTrace::new("turn-mcp");
react_trace.record(ReactStep::ToolCall {
tool_name: "github::create_issue".into(),
parameters_redacted: false,
result_summary: "created".into(),
duration_ms: 275,
success: true,
source: ToolSource::Mcp {
server: "github".into(),
},
});
annotate_mcp_calls_from_react_trace(&mut pipeline_trace, &react_trace);
pipeline_trace.end_stage(SpanOutcome::Ok);
let span = &pipeline_trace.stages[0];
assert_eq!(
span.annotations.get("mcp.server"),
Some(&serde_json::json!("github"))
);
assert_eq!(
span.annotations.get("mcp.tool"),
Some(&serde_json::json!("github::create_issue"))
);
assert_eq!(
span.annotations.get("mcp.duration_ms"),
Some(&serde_json::json!(275))
);
assert_eq!(
span.annotations.get("mcp.success"),
Some(&serde_json::json!(true))
);
}
#[test]
fn guard_set_presets_resolve_to_non_empty_chains() {
let full = GuardSetPreset::Full.resolve();
assert!(!full.is_empty());
let cached = GuardSetPreset::Cached.resolve();
assert!(!cached.is_empty());
let streaming = GuardSetPreset::Streaming.resolve();
assert!(!streaming.is_empty());
}
#[test]
fn guard_set_none_resolves_to_empty_chain() {
let none = GuardSetPreset::None.resolve();
assert!(none.is_empty());
}
#[test]
fn api_predicates() {
let cfg = PipelineConfig::api();
assert!(cfg.is_standard_inference());
assert!(!cfg.is_streaming_inference());
assert!(cfg.enforces_authority());
assert!(cfg.can_execute_tools());
assert!(cfg.resolves_session_from_body());
assert!(!cfg.is_channel());
assert!(!cfg.is_cron());
}
#[test]
fn streaming_predicates() {
let cfg = PipelineConfig::streaming();
assert!(!cfg.is_standard_inference());
assert!(cfg.is_streaming_inference());
assert!(cfg.enforces_authority());
assert!(!cfg.can_execute_tools()); assert!(cfg.resolves_session_from_body());
assert!(!cfg.is_channel());
assert!(!cfg.is_cron());
}
#[test]
fn channel_predicates() {
let cfg = PipelineConfig::channel("telegram");
assert!(cfg.is_standard_inference());
assert!(!cfg.is_streaming_inference());
assert!(cfg.enforces_authority());
assert!(cfg.can_execute_tools());
assert!(!cfg.resolves_session_from_body());
assert!(cfg.is_channel());
assert!(!cfg.is_cron());
}
#[test]
fn cron_predicates() {
let cfg = PipelineConfig::cron();
assert!(cfg.is_standard_inference());
assert!(!cfg.is_streaming_inference());
assert!(!cfg.enforces_authority());
assert!(cfg.can_execute_tools());
assert!(!cfg.resolves_session_from_body());
assert!(!cfg.is_channel());
assert!(cfg.is_cron());
}
#[test]
fn all_presets_have_injection_defense() {
assert!(PipelineConfig::api().injection_defense);
assert!(PipelineConfig::streaming().injection_defense);
assert!(PipelineConfig::channel("test").injection_defense);
assert!(PipelineConfig::cron().injection_defense);
}
#[test]
fn api_and_streaming_share_pre_inference_stage_flags() {
let api = PipelineConfig::api();
let stream = PipelineConfig::streaming();
assert_eq!(api.decomposition_gate, stream.decomposition_gate);
assert_eq!(api.delegated_execution, stream.delegated_execution);
assert_eq!(api.shortcuts_enabled, stream.shortcuts_enabled);
assert_eq!(api.injection_defense, stream.injection_defense);
assert_eq!(api.skill_first_enabled, stream.skill_first_enabled);
assert_eq!(
api.short_followup_expansion,
stream.short_followup_expansion
);
}
#[test]
fn all_presets_have_post_turn_ingest() {
assert!(PipelineConfig::api().post_turn_ingest);
assert!(PipelineConfig::streaming().post_turn_ingest);
assert!(PipelineConfig::channel("test").post_turn_ingest);
assert!(PipelineConfig::cron().post_turn_ingest);
}
#[test]
fn standard_inference_paths_have_full_guards() {
let api = PipelineConfig::api();
let channel = PipelineConfig::channel("telegram");
let cron = PipelineConfig::cron();
for cfg in [&api, &channel, &cron] {
assert_eq!(cfg.inference_mode, InferenceMode::Standard);
assert_eq!(cfg.guard_set, GuardSetPreset::Full);
assert_eq!(cfg.cache_guard_set, GuardSetPreset::Cached);
}
}
#[test]
fn only_api_has_nickname_refinement() {
assert!(PipelineConfig::api().nickname_refinement);
assert!(!PipelineConfig::streaming().nickname_refinement);
assert!(!PipelineConfig::channel("test").nickname_refinement);
assert!(!PipelineConfig::cron().nickname_refinement);
}
#[test]
fn only_channel_has_specialist_controls() {
assert!(!PipelineConfig::api().specialist_controls);
assert!(!PipelineConfig::streaming().specialist_controls);
assert!(PipelineConfig::channel("test").specialist_controls);
assert!(!PipelineConfig::cron().specialist_controls);
}
#[test]
fn streaming_uses_streaming_inference_mode() {
let cfg = PipelineConfig::streaming();
assert_eq!(cfg.inference_mode, InferenceMode::Streaming);
assert!(cfg.shortcuts_enabled);
assert!(cfg.decomposition_gate);
assert!(cfg.delegated_execution);
}
#[test]
fn session_resolution_modes_are_correct() {
assert_eq!(
PipelineConfig::api().session_resolution,
SessionResolutionMode::FromBody
);
assert_eq!(
PipelineConfig::streaming().session_resolution,
SessionResolutionMode::FromBody
);
assert_eq!(
PipelineConfig::channel("discord").session_resolution,
SessionResolutionMode::FromChannel {
platform: "discord".into()
}
);
assert_eq!(
PipelineConfig::cron().session_resolution,
SessionResolutionMode::Dedicated
);
}
#[test]
fn provided_session_resolution_stores_id() {
let mode = SessionResolutionMode::Provided {
session_id: "test-session-123".into(),
};
match mode {
SessionResolutionMode::Provided { session_id } => {
assert_eq!(session_id, "test-session-123");
}
_ => panic!("expected Provided variant"),
}
}
}