use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use awaken_contract::contract::tool::{
Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};
use super::manager::BackgroundTaskManager;
use super::state::BackgroundTaskStateKey;
pub const SEND_MESSAGE_TOOL_ID: &str = "send_message";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "relation", rename_all = "snake_case")]
#[allow(dead_code)] pub enum RecipientRef {
Parent,
Child {
name: String,
},
Agent {
thread_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
agent_id: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SendMessageReceipt {
pub message_id: String,
pub status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MessageError {
RecipientNotFound,
PermissionDenied,
RecipientUnavailable,
TransportFailed(String),
}
impl std::fmt::Display for MessageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RecipientNotFound => write!(f, "recipient_not_found"),
Self::PermissionDenied => write!(f, "permission_denied"),
Self::RecipientUnavailable => write!(f, "recipient_unavailable"),
Self::TransportFailed(e) => write!(f, "transport_failed: {e}"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DurableMessageRequest {
pub recipient_thread_id: String,
pub recipient_agent_id: Option<String>,
pub sender_agent_id: String,
pub message: String,
}
#[async_trait]
pub trait DurableMessageSink: Send + Sync {
async fn send_agent_message(&self, request: DurableMessageRequest) -> Result<String, String>;
}
pub struct SendMessageTool {
manager: Arc<BackgroundTaskManager>,
durable_sink: Arc<dyn DurableMessageSink>,
}
impl SendMessageTool {
pub fn new(
manager: Arc<BackgroundTaskManager>,
durable_sink: Arc<dyn DurableMessageSink>,
) -> Self {
Self {
manager,
durable_sink,
}
}
async fn send_durable(
&self,
recipient_thread_id: &str,
recipient_agent_id: &str,
sender_agent_id: &str,
message: &str,
) -> Result<String, String> {
self.durable_sink
.send_agent_message(DurableMessageRequest {
recipient_thread_id: recipient_thread_id.to_string(),
recipient_agent_id: (!recipient_agent_id.is_empty())
.then(|| recipient_agent_id.to_string()),
sender_agent_id: sender_agent_id.to_string(),
message: message.to_string(),
})
.await
}
fn resolve_child(
&self,
name: &str,
owner_thread_id: &str,
ctx: &ToolCallContext,
) -> Option<String> {
let snap = ctx.state::<BackgroundTaskStateKey>()?;
if let Some(meta) = snap.tasks.get(name)
&& meta.owner_thread_id == owner_thread_id
&& !meta.status.is_terminal()
{
return Some(name.to_string());
}
for meta in snap.tasks.values() {
if meta.owner_thread_id == owner_thread_id
&& !meta.status.is_terminal()
&& meta.name.as_deref() == Some(name)
{
return Some(meta.task_id.clone());
}
}
None
}
fn make_receipt(msg_id: String) -> SendMessageReceipt {
SendMessageReceipt {
message_id: msg_id,
status: "accepted",
error: None,
}
}
fn make_error(code: MessageError) -> SendMessageReceipt {
SendMessageReceipt {
message_id: String::new(),
status: "failed",
error: Some(code.to_string()),
}
}
}
#[async_trait]
impl Tool for SendMessageTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new(
SEND_MESSAGE_TOOL_ID,
SEND_MESSAGE_TOOL_ID,
"Send a message to a child task, parent agent, or team member.",
)
.with_parameters(json!({
"type": "object",
"properties": {
"to": {
"oneOf": [
{
"type": "object",
"properties": {
"relation": { "const": "parent" }
},
"required": ["relation"]
},
{
"type": "object",
"properties": {
"relation": { "const": "child" },
"name": { "type": "string", "description": "Task name or ID" }
},
"required": ["relation", "name"]
},
{
"type": "object",
"properties": {
"relation": { "const": "agent" },
"thread_id": { "type": "string" },
"agent_id": { "type": "string" }
},
"required": ["relation", "thread_id"]
}
]
},
"message": { "type": "string" }
},
"required": ["to", "message"]
}))
}
fn validate_args(&self, args: &Value) -> Result<(), ToolError> {
let to = args
.get("to")
.ok_or_else(|| ToolError::InvalidArguments("missing 'to'".into()))?;
let relation = to
.get("relation")
.and_then(Value::as_str)
.ok_or_else(|| ToolError::InvalidArguments("missing 'to.relation'".into()))?;
match relation {
"child" => {
if to.get("name").and_then(Value::as_str).is_none() {
return Err(ToolError::InvalidArguments("child requires 'name'".into()));
}
}
"agent" => {
if to.get("thread_id").and_then(Value::as_str).is_none() {
return Err(ToolError::InvalidArguments(
"agent requires 'thread_id'".into(),
));
}
}
"parent" => {}
other => {
return Err(ToolError::InvalidArguments(format!(
"unknown relation '{other}'"
)));
}
}
if args.get("message").and_then(Value::as_str).is_none() {
return Err(ToolError::InvalidArguments("missing 'message'".into()));
}
Ok(())
}
async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
let to = &args["to"];
let relation = to["relation"].as_str().unwrap_or_default();
let message = args["message"].as_str().unwrap_or_default();
let sender = &ctx.run_identity.agent_id;
let thread_id = &ctx.run_identity.thread_id;
let msg_id = uuid::Uuid::now_v7().to_string();
let receipt = match relation {
"child" => {
let name = to["name"].as_str().unwrap_or_default();
match self.resolve_child(name, thread_id, ctx) {
Some(task_id) => {
match self
.manager
.send_task_inbox_message(&task_id, thread_id, sender, message)
.await
{
Ok(()) => Self::make_receipt(msg_id.clone()),
Err(e) => {
use super::manager::SendError;
Self::make_error(match e {
SendError::TaskNotFound => MessageError::RecipientNotFound,
SendError::NotOwner => MessageError::PermissionDenied,
SendError::TaskTerminated(_) | SendError::InboxClosed => {
MessageError::RecipientUnavailable
}
SendError::NoInbox => MessageError::RecipientUnavailable,
})
}
}
}
None => Self::make_error(MessageError::RecipientNotFound),
}
}
"parent" => match ctx.run_identity.parent_thread_id.as_deref() {
Some(parent_tid) => {
match self.send_durable(parent_tid, "", sender, message).await {
Ok(dispatch_id) => Self::make_receipt(dispatch_id),
Err(e) => Self::make_error(MessageError::TransportFailed(e)),
}
}
None => Self::make_error(MessageError::RecipientUnavailable),
},
"agent" => {
let target_thread = to["thread_id"].as_str().unwrap_or_default();
let target_agent = to["agent_id"].as_str().unwrap_or_default();
match self
.send_durable(target_thread, target_agent, sender, message)
.await
{
Ok(dispatch_id) => Self::make_receipt(dispatch_id),
Err(e) => Self::make_error(MessageError::TransportFailed(e)),
}
}
_ => Self::make_error(MessageError::RecipientNotFound),
};
Ok(ToolResult::success(
SEND_MESSAGE_TOOL_ID,
serde_json::to_value(&receipt).unwrap_or_default(),
)
.into())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::extensions::background::{
BackgroundTaskPlugin, TaskParentContext, TaskResult as BgTaskResult,
};
use crate::state::StateStore;
use awaken_contract::contract::identity::RunIdentity;
use awaken_contract::registry_spec::AgentSpec;
use tokio::sync::Mutex;
#[derive(Default)]
struct RecordingDurableSink {
requests: Mutex<Vec<DurableMessageRequest>>,
}
#[async_trait]
impl DurableMessageSink for RecordingDurableSink {
async fn send_agent_message(
&self,
request: DurableMessageRequest,
) -> Result<String, String> {
let mut requests = self.requests.lock().await;
requests.push(request);
Ok(format!("durable-{}", requests.len()))
}
}
fn make_ctx_with_store(thread_id: &str, agent_id: &str, store: &StateStore) -> ToolCallContext {
ToolCallContext {
call_id: "call-1".into(),
tool_name: SEND_MESSAGE_TOOL_ID.into(),
run_identity: RunIdentity::new(
thread_id.to_string(),
None,
"run-1".to_string(),
None,
agent_id.to_string(),
awaken_contract::contract::identity::RunOrigin::User,
),
agent_spec: Arc::new(AgentSpec::default()),
snapshot: store.snapshot(),
activity_sink: None,
cancellation_token: None,
resume_input: None,
suspension_id: None,
suspension_reason: None,
}
}
fn make_ctx(thread_id: &str, agent_id: &str) -> ToolCallContext {
make_ctx_with_store(thread_id, agent_id, &StateStore::new())
}
fn make_manager_and_store() -> (Arc<BackgroundTaskManager>, StateStore) {
use crate::phase::ExecutionEnv;
use crate::plugins::Plugin;
let store = StateStore::new();
let manager = Arc::new(BackgroundTaskManager::new());
manager.set_store(store.clone());
let plugin: Arc<dyn Plugin> = Arc::new(BackgroundTaskPlugin::new(manager.clone()));
let env = ExecutionEnv::from_plugins(&[plugin], &Default::default()).unwrap();
store.register_keys(&env.key_registrations).unwrap();
(manager, store)
}
fn make_tool(manager: Arc<BackgroundTaskManager>) -> SendMessageTool {
SendMessageTool::new(manager, Arc::new(RecordingDurableSink::default()))
}
#[tokio::test]
async fn child_by_name_delivers_live() {
let (manager, store) = make_manager_and_store();
manager
.spawn_agent(
"thread-1",
Some("researcher"),
"desc",
TaskParentContext::default(),
|cancel, _s, mut rx| async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if rx.try_recv().is_some() {
BgTaskResult::Success(json!({"got": true}))
} else {
cancel.cancelled().await;
BgTaskResult::Cancelled
}
},
)
.await
.unwrap();
let tool = make_tool(manager.clone());
let ctx = make_ctx_with_store("thread-1", "parent", &store);
let r = tool
.execute(
json!({"to": {"relation": "child", "name": "researcher"}, "message": "hi"}),
&ctx,
)
.await
.unwrap();
assert_eq!(r.result.data["status"], "accepted");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
#[tokio::test]
async fn child_wrong_thread_permission_denied() {
let (manager, store) = make_manager_and_store();
manager
.spawn_agent(
"thread-1",
Some("worker"),
"desc",
TaskParentContext::default(),
|cancel, _s, _r| async move {
cancel.cancelled().await;
BgTaskResult::Cancelled
},
)
.await
.unwrap();
let tool = make_tool(manager.clone());
let ctx = make_ctx_with_store("thread-WRONG", "attacker", &store);
let r = tool
.execute(
json!({"to": {"relation": "child", "name": "worker"}, "message": "x"}),
&ctx,
)
.await
.unwrap();
assert_eq!(r.result.data["status"], "failed");
manager.cancel_all("thread-1").await;
}
#[tokio::test]
async fn agent_delivers_durable() {
let (manager, _store) = make_manager_and_store();
let sink = Arc::new(RecordingDurableSink::default());
let tool = SendMessageTool::new(manager, sink.clone());
let ctx = make_ctx("thread-1", "sender");
let r = tool
.execute(
json!({"to": {"relation": "agent", "thread_id": "thread-2"}, "message": "hello"}),
&ctx,
)
.await
.unwrap();
assert_eq!(r.result.data["status"], "accepted");
let requests = sink.requests.lock().await;
assert_eq!(requests.len(), 1);
assert_eq!(requests[0].recipient_thread_id, "thread-2");
assert_eq!(requests[0].recipient_agent_id, None);
assert_eq!(requests[0].sender_agent_id, "sender");
assert_eq!(requests[0].message, "hello");
}
#[tokio::test]
async fn parent_no_context_unavailable() {
let (manager, _store) = make_manager_and_store();
let tool = make_tool(manager);
let ctx = make_ctx("thread-1", "child");
let r = tool
.execute(
json!({"to": {"relation": "parent"}, "message": "done"}),
&ctx,
)
.await
.unwrap();
assert_eq!(r.result.data["status"], "failed");
assert!(
r.result.data["error"]
.as_str()
.unwrap()
.contains("recipient_unavailable")
);
}
#[tokio::test]
async fn parent_with_thread_id_delivers() {
let (manager, _store) = make_manager_and_store();
let sink = Arc::new(RecordingDurableSink::default());
let tool = SendMessageTool::new(manager, sink.clone());
let mut ctx = make_ctx("thread-child", "child-agent");
ctx.run_identity = awaken_contract::contract::identity::RunIdentity::new(
"thread-child".into(),
Some("thread-parent".into()),
"run-child".into(),
Some("run-parent".into()),
"child-agent".into(),
awaken_contract::contract::identity::RunOrigin::Subagent,
);
let r = tool
.execute(
json!({"to": {"relation": "parent"}, "message": "analysis complete"}),
&ctx,
)
.await
.unwrap();
assert_eq!(r.result.data["status"], "accepted");
let requests = sink.requests.lock().await;
assert_eq!(requests.len(), 1, "message should route to parent's thread");
assert_eq!(requests[0].recipient_thread_id, "thread-parent");
assert_eq!(requests[0].recipient_agent_id, None);
assert_eq!(requests[0].sender_agent_id, "child-agent");
assert_eq!(requests[0].message, "analysis complete");
}
#[test]
fn rejects_missing_relation() {
let (m, _) = make_manager_and_store();
let t = make_tool(m);
assert!(
t.validate_args(&json!({"to": {}, "message": "hi"}))
.is_err()
);
}
#[test]
fn rejects_child_without_name() {
let (m, _) = make_manager_and_store();
let t = make_tool(m);
assert!(
t.validate_args(&json!({"to": {"relation": "child"}, "message": "hi"}))
.is_err()
);
}
#[test]
fn rejects_agent_without_thread_id() {
let (m, _) = make_manager_and_store();
let t = make_tool(m);
assert!(
t.validate_args(&json!({"to": {"relation": "agent"}, "message": "hi"}))
.is_err()
);
}
#[test]
fn accepts_valid_child() {
let (m, _) = make_manager_and_store();
let t = make_tool(m);
assert!(
t.validate_args(&json!({"to": {"relation": "child", "name": "r"}, "message": "hi"}))
.is_ok()
);
}
#[test]
fn accepts_valid_parent() {
let (m, _) = make_manager_and_store();
let t = make_tool(m);
assert!(
t.validate_args(&json!({"to": {"relation": "parent"}, "message": "hi"}))
.is_ok()
);
}
#[test]
fn accepts_valid_agent() {
let (m, _) = make_manager_and_store();
let t = make_tool(m);
assert!(
t.validate_args(
&json!({"to": {"relation": "agent", "thread_id": "t1"}, "message": "hi"})
)
.is_ok()
);
}
#[tokio::test]
async fn parent_without_thread_id_returns_unavailable() {
let (manager, _store) = make_manager_and_store();
let tool = make_tool(manager);
let mut ctx = make_ctx("thread-1", "child");
ctx.run_identity = awaken_contract::contract::identity::RunIdentity::new(
"thread-1".into(),
None,
"run-child".into(),
Some("run-parent".into()), "child".into(),
awaken_contract::contract::identity::RunOrigin::Subagent,
);
let r = tool
.execute(
json!({"to": {"relation": "parent"}, "message": "hello parent"}),
&ctx,
)
.await
.unwrap();
assert_eq!(r.result.data["status"], "failed");
assert!(
r.result.data["error"]
.as_str()
.unwrap()
.contains("recipient_unavailable")
);
}
#[tokio::test]
async fn agent_routing_includes_agent_id() {
let (manager, _store) = make_manager_and_store();
let sink = Arc::new(RecordingDurableSink::default());
let tool = SendMessageTool::new(manager, sink.clone());
let ctx = make_ctx("thread-1", "sender");
let r = tool
.execute(
json!({
"to": {"relation": "agent", "thread_id": "thread-target", "agent_id": "reviewer"},
"message": "please review"
}),
&ctx,
)
.await
.unwrap();
assert_eq!(r.result.data["status"], "accepted");
let requests = sink.requests.lock().await;
assert_eq!(requests.len(), 1);
assert_eq!(requests[0].recipient_agent_id.as_deref(), Some("reviewer"));
}
}