1use std::collections::HashMap;
5use std::sync::Arc;
6
7use serde::Deserialize;
8use serde_json::{Value, json};
9
10use brainwires_core::{Tool, ToolContext, ToolInputSchema, ToolResult};
11
12use brainwires_stores::SessionId;
13use brainwires_stores::session::broker::{SessionBroker, SpawnRequest};
14
15pub const TOOL_SESSIONS_LIST: &str = "sessions_list";
17pub const TOOL_SESSIONS_HISTORY: &str = "sessions_history";
19pub const TOOL_SESSIONS_SEND: &str = "sessions_send";
21pub const TOOL_SESSIONS_SPAWN: &str = "sessions_spawn";
23
24pub const CTX_METADATA_SESSION_ID: &str = "session_id";
28
29pub const MAX_HISTORY_LIMIT: usize = 500;
32const DEFAULT_HISTORY_LIMIT: usize = 50;
33
34pub struct SessionsTool {
42 broker: Arc<dyn SessionBroker>,
43 current_session_id: Option<SessionId>,
48}
49
50impl SessionsTool {
51 pub fn new(broker: Arc<dyn SessionBroker>, current_session_id: Option<SessionId>) -> Self {
53 Self {
54 broker,
55 current_session_id,
56 }
57 }
58
59 pub fn get_tools() -> Vec<Tool> {
61 vec![
62 Self::list_tool(),
63 Self::history_tool(),
64 Self::send_tool(),
65 Self::spawn_tool(),
66 ]
67 }
68
69 fn list_tool() -> Tool {
72 Tool {
73 name: TOOL_SESSIONS_LIST.to_string(),
74 description:
75 "List every live chat session currently managed by the host — including the \
76 caller's own session and any sessions the caller (or its peers) have spawned. \
77 Use this to discover session ids before calling sessions_history or sessions_send. \
78 Returns a JSON array of session summaries (id, channel, peer, timestamps, \
79 message_count, optional parent)."
80 .to_string(),
81 input_schema: ToolInputSchema::object(HashMap::new(), vec![]),
82 requires_approval: false,
83 ..Default::default()
84 }
85 }
86
87 fn history_tool() -> Tool {
88 let mut props = HashMap::new();
89 props.insert(
90 "session_id".to_string(),
91 json!({
92 "type": "string",
93 "description": "The target session id (from sessions_list)."
94 }),
95 );
96 props.insert(
97 "limit".to_string(),
98 json!({
99 "type": "number",
100 "description": format!(
101 "Max messages to return (default {DEFAULT_HISTORY_LIMIT}, \
102 hard-capped at {MAX_HISTORY_LIMIT})."
103 ),
104 }),
105 );
106 Tool {
107 name: TOOL_SESSIONS_HISTORY.to_string(),
108 description: "Return a target session's recent transcript as a JSON array of \
109 {role, content, timestamp} objects (newest last). Use this to catch up \
110 on what a spawned sub-session has produced, or to read another user's \
111 ongoing conversation before intervening."
112 .to_string(),
113 input_schema: ToolInputSchema::object(props, vec!["session_id".to_string()]),
114 requires_approval: false,
115 ..Default::default()
116 }
117 }
118
119 fn send_tool() -> Tool {
120 let mut props = HashMap::new();
121 props.insert(
122 "session_id".to_string(),
123 json!({
124 "type": "string",
125 "description": "Target session id. Must not equal the caller's own session \
126 (self-send is rejected to prevent recursion)."
127 }),
128 );
129 props.insert(
130 "text".to_string(),
131 json!({
132 "type": "string",
133 "description": "The user-role message to inject into the target session's \
134 inbound queue."
135 }),
136 );
137 Tool {
138 name: TOOL_SESSIONS_SEND.to_string(),
139 description:
140 "Inject a user-role message into another session's inbound queue. Fire-and-forget: \
141 returns {\"ok\": true} as soon as the message is queued; the target session \
142 processes it asynchronously. Use this to nudge a spawned sub-session, relay \
143 information between two user sessions, or ask a peer session a follow-up \
144 question."
145 .to_string(),
146 input_schema: ToolInputSchema::object(
147 props,
148 vec!["session_id".to_string(), "text".to_string()],
149 ),
150 requires_approval: true,
154 ..Default::default()
155 }
156 }
157
158 fn spawn_tool() -> Tool {
159 let mut props = HashMap::new();
160 props.insert(
161 "prompt".to_string(),
162 json!({
163 "type": "string",
164 "description": "Initial user message to seed the new session with."
165 }),
166 );
167 props.insert(
168 "model".to_string(),
169 json!({
170 "type": "string",
171 "description": "Optional model override (e.g. 'claude-opus-4-7'). Omit to inherit from parent."
172 }),
173 );
174 props.insert(
175 "system".to_string(),
176 json!({
177 "type": "string",
178 "description": "Optional system prompt for the sub-session. Omit to inherit."
179 }),
180 );
181 props.insert(
182 "tools".to_string(),
183 json!({
184 "type": "array",
185 "items": { "type": "string" },
186 "description": "Optional allow-list of tool names the sub-session may invoke. Omit to inherit the parent's toolset."
187 }),
188 );
189 props.insert(
190 "wait_for_first_reply".to_string(),
191 json!({
192 "type": "boolean",
193 "description": "If true, block this tool call until the sub-session produces \
194 its first assistant message (or wait_timeout_secs elapses). \
195 Default false — return immediately with just the session id.",
196 "default": false
197 }),
198 );
199 props.insert(
200 "wait_timeout_secs".to_string(),
201 json!({
202 "type": "number",
203 "description": "Seconds to wait when wait_for_first_reply is true (default 60).",
204 "default": 60
205 }),
206 );
207
208 Tool {
209 name: TOOL_SESSIONS_SPAWN.to_string(),
210 description:
211 "Spawn a new chat sub-session as a child of the current session, seeded with \
212 `prompt`. Returns {session_id, first_reply?}. Use this to delegate a focused \
213 task (e.g. 'spawn a research sub-session and return in 5m') — the parent can \
214 later inspect progress via sessions_history or push updates via sessions_send."
215 .to_string(),
216 input_schema: ToolInputSchema::object(props, vec!["prompt".to_string()]),
217 requires_approval: true,
218 ..Default::default()
219 }
220 }
221
222 pub async fn execute(
228 &self,
229 tool_use_id: &str,
230 tool_name: &str,
231 input: &Value,
232 context: &ToolContext,
233 ) -> ToolResult {
234 match tool_name {
235 TOOL_SESSIONS_LIST => self.exec_list(tool_use_id).await,
236 TOOL_SESSIONS_HISTORY => self.exec_history(tool_use_id, input).await,
237 TOOL_SESSIONS_SEND => self.exec_send(tool_use_id, input, context).await,
238 TOOL_SESSIONS_SPAWN => self.exec_spawn(tool_use_id, input, context).await,
239 other => ToolResult::error(
240 tool_use_id.to_string(),
241 format!("Unknown sessions tool: {other}"),
242 ),
243 }
244 }
245
246 async fn exec_list(&self, tool_use_id: &str) -> ToolResult {
247 match self.broker.list().await {
248 Ok(summaries) => match serde_json::to_string(&summaries) {
249 Ok(body) => ToolResult::success(tool_use_id.to_string(), body),
250 Err(e) => ToolResult::error(
251 tool_use_id.to_string(),
252 format!("Failed to serialize session list: {e}"),
253 ),
254 },
255 Err(e) => ToolResult::error(
256 tool_use_id.to_string(),
257 format!("sessions_list failed: {e}"),
258 ),
259 }
260 }
261
262 async fn exec_history(&self, tool_use_id: &str, input: &Value) -> ToolResult {
263 #[derive(Deserialize)]
264 struct In {
265 session_id: Option<String>,
266 #[serde(default)]
267 limit: Option<usize>,
268 }
269 let raw: In = match serde_json::from_value(input.clone()) {
270 Ok(v) => v,
271 Err(e) => {
272 return ToolResult::error(
273 tool_use_id.to_string(),
274 format!("Invalid sessions_history input: {e}"),
275 );
276 }
277 };
278 let sid = match raw.session_id.filter(|s| !s.is_empty()) {
279 Some(s) => SessionId(s),
280 None => {
281 return ToolResult::error(
282 tool_use_id.to_string(),
283 "sessions_history requires a non-empty `session_id`".to_string(),
284 );
285 }
286 };
287 let limit = Some(
288 raw.limit
289 .unwrap_or(DEFAULT_HISTORY_LIMIT)
290 .min(MAX_HISTORY_LIMIT),
291 );
292 match self.broker.history(&sid, limit).await {
293 Ok(msgs) => match serde_json::to_string(&msgs) {
294 Ok(body) => ToolResult::success(tool_use_id.to_string(), body),
295 Err(e) => ToolResult::error(
296 tool_use_id.to_string(),
297 format!("Failed to serialize session history: {e}"),
298 ),
299 },
300 Err(e) => ToolResult::error(
301 tool_use_id.to_string(),
302 format!("sessions_history failed: {e}"),
303 ),
304 }
305 }
306
307 async fn exec_send(
308 &self,
309 tool_use_id: &str,
310 input: &Value,
311 context: &ToolContext,
312 ) -> ToolResult {
313 #[derive(Deserialize)]
314 struct In {
315 session_id: Option<String>,
316 text: Option<String>,
317 }
318 let raw: In = match serde_json::from_value(input.clone()) {
319 Ok(v) => v,
320 Err(e) => {
321 return ToolResult::error(
322 tool_use_id.to_string(),
323 format!("Invalid sessions_send input: {e}"),
324 );
325 }
326 };
327 let sid = match raw.session_id.filter(|s| !s.is_empty()) {
328 Some(s) => SessionId(s),
329 None => {
330 return ToolResult::error(
331 tool_use_id.to_string(),
332 "sessions_send requires a non-empty `session_id`".to_string(),
333 );
334 }
335 };
336 let text = match raw.text {
337 Some(t) if !t.is_empty() => t,
338 _ => {
339 return ToolResult::error(
340 tool_use_id.to_string(),
341 "sessions_send requires a non-empty `text`".to_string(),
342 );
343 }
344 };
345
346 if let Some(self_id) = self.resolve_current_session_id(context)
347 && self_id == sid
348 {
349 return ToolResult::error(
350 tool_use_id.to_string(),
351 "sessions_send cannot target the caller's own session — that would recurse. \
352 Use a spawned sub-session id, or address a peer session from sessions_list."
353 .to_string(),
354 );
355 }
356
357 match self.broker.send(&sid, text).await {
358 Ok(()) => ToolResult::success(tool_use_id.to_string(), json!({"ok": true}).to_string()),
359 Err(e) => ToolResult::error(
360 tool_use_id.to_string(),
361 format!("sessions_send failed: {e}"),
362 ),
363 }
364 }
365
366 async fn exec_spawn(
367 &self,
368 tool_use_id: &str,
369 input: &Value,
370 context: &ToolContext,
371 ) -> ToolResult {
372 let req: SpawnRequest = match serde_json::from_value(input.clone()) {
373 Ok(v) => v,
374 Err(e) => {
375 return ToolResult::error(
376 tool_use_id.to_string(),
377 format!("Invalid sessions_spawn input: {e}"),
378 );
379 }
380 };
381 if req.prompt.is_empty() {
382 return ToolResult::error(
383 tool_use_id.to_string(),
384 "sessions_spawn requires a non-empty `prompt`".to_string(),
385 );
386 }
387 let parent = match self.resolve_current_session_id(context) {
388 Some(id) => id,
389 None => {
390 return ToolResult::error(
391 tool_use_id.to_string(),
392 "sessions_spawn could not determine the caller's session id — \
393 host must set ToolContext::metadata[\"session_id\"] or pass \
394 current_session_id into SessionsTool::new."
395 .to_string(),
396 );
397 }
398 };
399
400 match self.broker.spawn(&parent, req).await {
401 Ok(spawned) => match serde_json::to_string(&spawned) {
402 Ok(body) => ToolResult::success(tool_use_id.to_string(), body),
403 Err(e) => ToolResult::error(
404 tool_use_id.to_string(),
405 format!("Failed to serialize spawned session: {e}"),
406 ),
407 },
408 Err(e) => ToolResult::error(
409 tool_use_id.to_string(),
410 format!("sessions_spawn failed: {e}"),
411 ),
412 }
413 }
414
415 fn resolve_current_session_id(&self, context: &ToolContext) -> Option<SessionId> {
416 context
417 .metadata
418 .get(CTX_METADATA_SESSION_ID)
419 .filter(|s| !s.is_empty())
420 .map(|s| SessionId(s.clone()))
421 .or_else(|| self.current_session_id.clone())
422 }
423}
424
425#[cfg(test)]
430mod tests {
431 use super::*;
432 use async_trait::async_trait;
433 use brainwires_stores::session::broker::{SessionMessage, SessionSummary, SpawnedSession};
434 use chrono::{TimeZone, Utc};
435 use std::sync::Mutex;
436
437 struct MockBroker {
439 list_ret: Mutex<Vec<SessionSummary>>,
440 history_ret: Mutex<Vec<SessionMessage>>,
441 history_calls: Mutex<Vec<(SessionId, Option<usize>)>>,
443 send_calls: Mutex<Vec<(SessionId, String)>>,
444 spawn_calls: Mutex<Vec<(SessionId, SpawnRequest)>>,
445 spawn_ret: Mutex<Option<SpawnedSession>>,
446 }
447
448 impl MockBroker {
449 fn new() -> Self {
450 Self {
451 list_ret: Mutex::new(Vec::new()),
452 history_ret: Mutex::new(Vec::new()),
453 history_calls: Mutex::new(Vec::new()),
454 send_calls: Mutex::new(Vec::new()),
455 spawn_calls: Mutex::new(Vec::new()),
456 spawn_ret: Mutex::new(None),
457 }
458 }
459 }
460
461 #[async_trait]
462 impl SessionBroker for MockBroker {
463 async fn list(&self) -> anyhow::Result<Vec<SessionSummary>> {
464 Ok(self.list_ret.lock().unwrap().clone())
465 }
466
467 async fn history(
468 &self,
469 id: &SessionId,
470 limit: Option<usize>,
471 ) -> anyhow::Result<Vec<SessionMessage>> {
472 self.history_calls.lock().unwrap().push((id.clone(), limit));
473 Ok(self.history_ret.lock().unwrap().clone())
474 }
475
476 async fn send(&self, id: &SessionId, text: String) -> anyhow::Result<()> {
477 self.send_calls.lock().unwrap().push((id.clone(), text));
478 Ok(())
479 }
480
481 async fn spawn(
482 &self,
483 parent: &SessionId,
484 req: SpawnRequest,
485 ) -> anyhow::Result<SpawnedSession> {
486 self.spawn_calls
487 .lock()
488 .unwrap()
489 .push((parent.clone(), req.clone()));
490 Ok(self
491 .spawn_ret
492 .lock()
493 .unwrap()
494 .clone()
495 .unwrap_or(SpawnedSession {
496 id: SessionId("spawned-1".into()),
497 first_reply: None,
498 }))
499 }
500 }
501
502 fn fixed_ts() -> chrono::DateTime<Utc> {
503 Utc.with_ymd_and_hms(2026, 4, 19, 12, 0, 0).unwrap()
504 }
505
506 fn ctx_with_session(session: &str) -> ToolContext {
507 let mut ctx = ToolContext::default();
508 ctx.metadata
509 .insert(CTX_METADATA_SESSION_ID.to_string(), session.to_string());
510 ctx
511 }
512
513 #[test]
514 fn list_tool_schema_shape() {
515 let tools = SessionsTool::get_tools();
516 let list = tools
517 .iter()
518 .find(|t| t.name == TOOL_SESSIONS_LIST)
519 .expect("list tool present");
520 let required = list.input_schema.required.clone().unwrap_or_default();
522 assert!(
523 required.is_empty(),
524 "sessions_list must have no required inputs, got {required:?}"
525 );
526 assert!(!list.description.is_empty());
527 let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
529 assert!(names.contains(&TOOL_SESSIONS_LIST));
530 assert!(names.contains(&TOOL_SESSIONS_HISTORY));
531 assert!(names.contains(&TOOL_SESSIONS_SEND));
532 assert!(names.contains(&TOOL_SESSIONS_SPAWN));
533 }
534
535 #[tokio::test]
536 async fn history_tool_rejects_missing_session_id() {
537 let broker = Arc::new(MockBroker::new());
538 let tool = SessionsTool::new(broker.clone(), Some(SessionId("self".into())));
539 let ctx = ctx_with_session("self");
540 let result = tool
541 .execute("call-1", TOOL_SESSIONS_HISTORY, &json!({}), &ctx)
542 .await;
543 assert!(result.is_error, "expected error result, got {result:?}");
544 assert!(
545 result.content.to_lowercase().contains("session_id"),
546 "error should mention session_id, got: {}",
547 result.content
548 );
549 assert!(
550 broker.history_calls.lock().unwrap().is_empty(),
551 "broker must not be called for invalid input"
552 );
553 }
554
555 #[tokio::test]
556 async fn history_tool_clamps_limit() {
557 let broker = Arc::new(MockBroker::new());
558 broker.history_ret.lock().unwrap().push(SessionMessage {
559 role: "user".into(),
560 content: "hi".into(),
561 timestamp: fixed_ts(),
562 });
563 let tool = SessionsTool::new(broker.clone(), Some(SessionId("self".into())));
564 let ctx = ctx_with_session("self");
565 let input = json!({"session_id": "target", "limit": 9999});
566 let result = tool
567 .execute("call-1", TOOL_SESSIONS_HISTORY, &input, &ctx)
568 .await;
569 assert!(!result.is_error, "unexpected error: {}", result.content);
570 let calls = broker.history_calls.lock().unwrap();
571 assert_eq!(calls.len(), 1);
572 assert_eq!(calls[0].0, SessionId("target".into()));
573 assert_eq!(
574 calls[0].1,
575 Some(MAX_HISTORY_LIMIT),
576 "limit must be clamped to MAX_HISTORY_LIMIT ({MAX_HISTORY_LIMIT})",
577 );
578 }
579
580 #[tokio::test]
581 async fn send_tool_self_send_rejected() {
582 let broker = Arc::new(MockBroker::new());
583 let tool = SessionsTool::new(broker.clone(), Some(SessionId("me".into())));
584 let ctx = ctx_with_session("me");
585 let input = json!({"session_id": "me", "text": "hello"});
586 let result = tool
587 .execute("call-1", TOOL_SESSIONS_SEND, &input, &ctx)
588 .await;
589 assert!(result.is_error);
590 assert!(
591 result.content.to_lowercase().contains("recurs"),
592 "error should mention recursion, got: {}",
593 result.content
594 );
595 assert!(broker.send_calls.lock().unwrap().is_empty());
596 }
597
598 #[tokio::test]
599 async fn send_tool_forwards_to_broker_when_distinct() {
600 let broker = Arc::new(MockBroker::new());
601 let tool = SessionsTool::new(broker.clone(), Some(SessionId("me".into())));
602 let ctx = ctx_with_session("me");
603 let input = json!({"session_id": "peer", "text": "ping"});
604 let result = tool
605 .execute("call-1", TOOL_SESSIONS_SEND, &input, &ctx)
606 .await;
607 assert!(!result.is_error, "unexpected error: {}", result.content);
608 let calls = broker.send_calls.lock().unwrap();
609 assert_eq!(calls.len(), 1);
610 assert_eq!(calls[0].0, SessionId("peer".into()));
611 assert_eq!(calls[0].1, "ping");
612 assert!(result.content.contains("\"ok\""));
613 }
614
615 #[tokio::test]
616 async fn spawn_tool_passes_through() {
617 let broker = Arc::new(MockBroker::new());
618 let tool = SessionsTool::new(broker.clone(), Some(SessionId("parent".into())));
619 let ctx = ctx_with_session("parent");
620 let input = json!({
621 "prompt": "research the openclaw parity gap",
622 "model": "claude-opus-4-7",
623 "system": "you are a research agent",
624 "tools": ["fetch_url", "query_codebase"],
625 "wait_for_first_reply": true,
626 "wait_timeout_secs": 30u64,
627 });
628 let result = tool
629 .execute("call-1", TOOL_SESSIONS_SPAWN, &input, &ctx)
630 .await;
631 assert!(!result.is_error, "unexpected error: {}", result.content);
632 let calls = broker.spawn_calls.lock().unwrap();
633 assert_eq!(calls.len(), 1);
634 assert_eq!(calls[0].0, SessionId("parent".into()));
635 let req = &calls[0].1;
636 assert_eq!(req.prompt, "research the openclaw parity gap");
637 assert_eq!(req.model.as_deref(), Some("claude-opus-4-7"));
638 assert_eq!(req.system.as_deref(), Some("you are a research agent"));
639 assert_eq!(
640 req.tools.as_deref(),
641 Some(["fetch_url".to_string(), "query_codebase".to_string()].as_slice())
642 );
643 assert!(req.wait_for_first_reply);
644 assert_eq!(req.wait_timeout_secs, 30);
645 }
646
647 #[tokio::test]
648 async fn spawn_tool_errors_without_parent_session() {
649 let broker = Arc::new(MockBroker::new());
650 let tool = SessionsTool::new(broker.clone(), None);
652 let ctx = ToolContext::default();
653 let input = json!({"prompt": "x"});
654 let result = tool
655 .execute("call-1", TOOL_SESSIONS_SPAWN, &input, &ctx)
656 .await;
657 assert!(result.is_error);
658 assert!(
659 result.content.contains("session") && result.content.to_lowercase().contains("caller")
660 );
661 assert!(broker.spawn_calls.lock().unwrap().is_empty());
662 }
663
664 #[tokio::test]
665 async fn list_returns_json_array() {
666 let broker = Arc::new(MockBroker::new());
667 broker.list_ret.lock().unwrap().push(SessionSummary {
668 id: SessionId("s1".into()),
669 channel: "discord".into(),
670 peer: "alice".into(),
671 created_at: fixed_ts(),
672 last_active: fixed_ts(),
673 message_count: 3,
674 parent: None,
675 });
676 let tool = SessionsTool::new(broker.clone(), Some(SessionId("me".into())));
677 let ctx = ctx_with_session("me");
678 let result = tool
679 .execute("c1", TOOL_SESSIONS_LIST, &json!({}), &ctx)
680 .await;
681 assert!(!result.is_error);
682 assert!(result.content.starts_with('['));
683 assert!(result.content.contains("\"s1\""));
684 assert!(result.content.contains("\"discord\""));
685 }
686}