mod decision;
mod observe;
mod verify;
#[cfg(test)]
pub mod mock_control;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::core::sm::control::{LaunchParams, SessionControl};
use crate::core::sm::goals::{GoalStatus, SessionLink, SessionUpdate, SmGoalError, SmGoalStore};
use crate::core::sm::providers::{LlmRequest, SmModelTier};
use super::SessionManagerAgent;
pub use decision::{SmDecision, TaskSpec, parse_decision};
use observe::interpret_session;
use verify::redirect_direct_work;
const DECOMPOSE_MAX_TOKENS: u32 = 2_048;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DelegationOutcome {
pub reply: String,
pub goal_id: String,
pub launched: Vec<String>,
pub goal_done: bool,
pub goal_status: String,
}
#[derive(Debug, thiserror::Error)]
pub enum DelegationError {
#[error("{0}")]
Degraded(String),
#[error("session-manager goal store failed: {0}")]
Goal(#[from] SmGoalError),
}
impl SessionManagerAgent {
pub async fn delegate_goal(
&self,
message: &str,
control: &Arc<dyn SessionControl>,
goals: &Arc<Mutex<SmGoalStore>>,
) -> Result<DelegationOutcome, DelegationError> {
let goal_id = {
let mut store = goals.lock().await;
store.create(message.to_string(), Vec::new()).await?.id
};
let decision = self.decompose(message, &goal_id, goals).await?;
let tasks = match decision {
SmDecision::Delegate { tasks } => tasks,
SmDecision::Respond { message } => {
let goal_status = self.goal_status_label(&goal_id, goals).await;
return Ok(DelegationOutcome {
reply: message,
goal_id,
launched: Vec::new(),
goal_done: false,
goal_status,
});
}
SmDecision::DoWork { summary } => {
let reply = redirect_direct_work(&summary);
let goal_status = {
let mut store = goals.lock().await;
let _ = store.note(&goal_id, reply.clone()).await;
let _ = store.set_status(&goal_id, GoalStatus::Blocked).await;
store
.get(&goal_id)
.map(|g| g.status.label().to_string())
.unwrap_or_else(|| GoalStatus::Blocked.label().to_string())
};
return Ok(DelegationOutcome {
reply,
goal_id,
launched: Vec::new(),
goal_done: false,
goal_status,
});
}
};
let launched = self.launch_tasks(&tasks, &goal_id, control, goals).await;
if launched.is_empty() {
let goal_status = {
let mut store = goals.lock().await;
let _ = store
.note(
&goal_id,
"decompose produced no launchable task".to_string(),
)
.await;
store
.get(&goal_id)
.map(|g| g.status.label().to_string())
.unwrap_or_else(|| GoalStatus::Pending.label().to_string())
};
return Ok(DelegationOutcome {
reply: "No launchable task was produced for this goal; nothing delegated."
.to_string(),
goal_id,
launched,
goal_done: false,
goal_status,
});
}
self.observe_and_verify(&launched, &goal_id, control, goals)
.await?;
let goal_done = {
let mut store = goals.lock().await;
match store.close(&goal_id).await {
Ok(_) => true,
Err(SmGoalError::VerificationGate { .. }) => false,
Err(e) => {
tracing::warn!(%goal_id, "delegate: gated close failed (not a gate rejection): {e}");
false
}
}
};
let reply = self.report(&goal_id, &launched, goal_done, goals).await;
let goal_status = self.goal_status_label(&goal_id, goals).await;
Ok(DelegationOutcome {
reply,
goal_id,
launched,
goal_done,
goal_status,
})
}
async fn decompose(
&self,
message: &str,
goal_id: &str,
goals: &Arc<Mutex<SmGoalStore>>,
) -> Result<SmDecision, DelegationError> {
let runtime = self
.runtime_ref()
.ok_or_else(|| DelegationError::Degraded(super::chat::degraded_notice()))?;
let resolved = runtime
.resolver
.resolve(&self.config.inference, SmModelTier::Orchestration)
.await
.map_err(|e| {
if e.is_degraded() {
DelegationError::Degraded(super::chat::degraded_notice())
} else {
DelegationError::Degraded(e.to_string())
}
})?;
let recall = self.delegate_recall(runtime, message).await;
let system = decision_system_prompt();
let user = decision_user_prompt(message, goal_id, recall.as_deref());
let req = LlmRequest {
model: resolved.model.clone(),
system,
messages: vec![crate::core::sm::providers::ChatMessage {
role: "user".to_string(),
content: user,
}],
temperature: self.config.inference.temperature,
max_tokens: DECOMPOSE_MAX_TOKENS,
};
let response = resolved
.provider
.complete(req)
.await
.map_err(|e| DelegationError::Degraded(e.to_string()))?;
let decision = parse_decision(&response.text);
{
let mut store = goals.lock().await;
let _ = store
.note(goal_id, format!("decompose decision: {decision:?}"))
.await;
}
Ok(decision)
}
async fn launch_tasks(
&self,
tasks: &[TaskSpec],
goal_id: &str,
control: &Arc<dyn SessionControl>,
goals: &Arc<Mutex<SmGoalStore>>,
) -> Vec<String> {
let mut launched = Vec::new();
for task in tasks.iter().filter(|t| t.is_launchable()) {
let params = LaunchParams {
workdir: task.workdir.clone(),
model: task.model.clone(),
prompt: Some(task.prompt.clone()),
goal_id: Some(goal_id.to_string()),
};
let result = match control.launch(params).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(%goal_id, "delegate: launch failed for a task: {e}");
let mut store = goals.lock().await;
let _ = store
.note(goal_id, format!("launch failed for a task: {e}"))
.await;
continue;
}
};
let session_id = result
.get("session_id")
.and_then(serde_json::Value::as_str)
.unwrap_or_default()
.to_string();
if session_id.is_empty() {
tracing::warn!(
%goal_id,
"delegate: launch returned no session_id; skipping link+delivery for this task"
);
let mut store = goals.lock().await;
let _ = store
.note(goal_id, "launch returned no session_id".to_string())
.await;
continue;
}
{
let mut store = goals.lock().await;
if let Err(e) = store
.link(goal_id, SessionLink::launched(&session_id, &task.prompt))
.await
{
tracing::warn!(%session_id, %goal_id, "delegate: goal link failed: {e}");
}
}
if let Err(e) = control.send(&session_id, &task.prompt).await {
tracing::warn!(%session_id, "delegate: task delivery failed: {e}");
let mut store = goals.lock().await;
let _ = store
.note(
goal_id,
format!("task delivery to {session_id} failed: {e}"),
)
.await;
}
launched.push(session_id);
}
launched
}
async fn observe_and_verify(
&self,
launched: &[String],
goal_id: &str,
control: &Arc<dyn SessionControl>,
goals: &Arc<Mutex<SmGoalStore>>,
) -> Result<(), DelegationError> {
for session_id in launched {
let observed = match control.get(session_id).await {
Ok(json) => interpret_session(&json),
Err(e) => {
let mut store = goals.lock().await;
let _ = store
.note(goal_id, format!("observe {session_id} failed: {e}"))
.await;
continue;
}
};
let update = SessionUpdate {
session_id: session_id.clone(),
state: Some(observed.state),
evidence: observed.evidence.clone(),
note: observed
.evidence
.as_ref()
.map(|e| format!("verified {session_id}: {e}")),
};
let mut store = goals.lock().await;
if let Err(e) = store.update(goal_id, update).await {
tracing::warn!(%session_id, %goal_id, "delegate: goal update failed: {e}");
}
}
Ok(())
}
async fn goal_status_label(&self, goal_id: &str, goals: &Arc<Mutex<SmGoalStore>>) -> String {
let store = goals.lock().await;
store
.get(goal_id)
.map(|g| g.status.label().to_string())
.unwrap_or_else(|| GoalStatus::Pending.label().to_string())
}
async fn report(
&self,
goal_id: &str,
launched: &[String],
goal_done: bool,
goals: &Arc<Mutex<SmGoalStore>>,
) -> String {
let store = goals.lock().await;
let Some(goal) = store.get(goal_id) else {
return format!("Goal {goal_id}: launched {} session(s).", launched.len());
};
let verified = goal
.sessions
.iter()
.filter(|s| s.state.is_verified())
.count();
let total = goal.sessions.len();
if goal_done {
format!(
"Goal {goal_id} DONE: {verified}/{total} task(s) verified with observed evidence. \
Launched {} session(s).",
launched.len()
)
} else {
format!(
"Goal {goal_id} in progress: {verified}/{total} task(s) verified \
(not yet done — verification evidence required for the rest). \
Launched {} session(s).",
launched.len()
)
}
}
}
fn decision_system_prompt() -> String {
let base = crate::core::sm::prompt::resolve_sm_prompt_default();
format!("{base}\n\n---\n\n{DECISION_INSTRUCTIONS}")
}
const DECISION_INSTRUCTIONS: &str = "\
# DECISION (machine-read)
Reply with EXACTLY ONE JSON object (optionally inside a ```json fence) and no
other commands. Choose ONE action:
- To delegate real work (the default — you have no hands of your own):
{\"action\":\"delegate\",\"tasks\":[{\"workdir\":\"<repo-or-dir>\",\"prompt\":\"<task>\",\"model\":\"<optional>\"}]}
One task = one launched session. Split a goal into session-sized tasks.
- To talk to the operator (ask a question / give status — Allowlist 1):
{\"action\":\"respond\",\"message\":\"<operator-facing text>\"}
You MUST NOT do the work yourself (SP1-SP5): never edit files, read project
source, run builds/tests, or answer a work request from your own knowledge.
If you are tempted to do the work directly, emit a delegate action instead.";
fn decision_user_prompt(message: &str, goal_id: &str, recall: Option<&str>) -> String {
let recall_block = match recall {
Some(r) if !r.trim().is_empty() => format!("\n\nRelevant prior context:\n{r}"),
_ => String::new(),
};
format!(
"Operator goal ({goal_id}): {message}{recall_block}\n\n\
Decompose this goal into session-sized tasks and reply with your decision."
)
}
#[cfg(test)]
#[path = "delegate_tests.rs"]
mod delegate_tests;