Skip to main content

brainwires_tool_runtime/sessions/
sessions_tool.rs

1//! [`SessionsTool`] — bundles `sessions_list`, `sessions_history`,
2//! `sessions_send`, and `sessions_spawn` over a [`SessionBroker`].
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use serde::Deserialize;
8use serde_json::{Value, json};
9
10use brainwires_core::{Tool, ToolContext, ToolInputSchema, ToolResult};
11
12use brainwires_stores::SessionId;
13use brainwires_stores::session::broker::{SessionBroker, SpawnRequest};
14
15/// Tool name for the list-sessions tool.
16pub const TOOL_SESSIONS_LIST: &str = "sessions_list";
17/// Tool name for the session-history tool.
18pub const TOOL_SESSIONS_HISTORY: &str = "sessions_history";
19/// Tool name for the send-to-session tool.
20pub const TOOL_SESSIONS_SEND: &str = "sessions_send";
21/// Tool name for the spawn-session tool.
22pub const TOOL_SESSIONS_SPAWN: &str = "sessions_spawn";
23
24/// Metadata key the host may set on [`ToolContext::metadata`] to carry the
25/// caller's own session id. Read by the spawn tool so the new session's
26/// `parent` pointer is correct.
27pub const CTX_METADATA_SESSION_ID: &str = "session_id";
28
29/// Maximum number of messages `sessions_history` will return in a single call.
30/// Protects the agent's context window against pathological transcripts.
31pub const MAX_HISTORY_LIMIT: usize = 500;
32const DEFAULT_HISTORY_LIMIT: usize = 50;
33
34/// Bundle of four session-control tools, all backed by a single
35/// [`SessionBroker`].
36///
37/// Construct one per agent session so the tool call sites know which session
38/// is "self" (for the `sessions_send` recursion check and for `sessions_spawn`
39/// parent-pointer wiring when the executor does not plumb the session id
40/// through [`ToolContext::metadata`]).
41pub struct SessionsTool {
42    broker: Arc<dyn SessionBroker>,
43    /// Caller's own session id. Consulted when [`ToolContext::metadata`]
44    /// does not contain `session_id`; `None` means the host did not tell us
45    /// — the spawn/self-send checks then fall back to returning a clear
46    /// error in the tool result.
47    current_session_id: Option<SessionId>,
48}
49
50impl SessionsTool {
51    /// Construct a new `SessionsTool`.
52    pub fn new(broker: Arc<dyn SessionBroker>, current_session_id: Option<SessionId>) -> Self {
53        Self {
54            broker,
55            current_session_id,
56        }
57    }
58
59    /// Return the four tool definitions this bundle exposes to the LLM.
60    pub fn get_tools() -> Vec<Tool> {
61        vec![
62            Self::list_tool(),
63            Self::history_tool(),
64            Self::send_tool(),
65            Self::spawn_tool(),
66        ]
67    }
68
69    // ── Tool schemas ────────────────────────────────────────────────────
70
71    fn list_tool() -> Tool {
72        Tool {
73            name: TOOL_SESSIONS_LIST.to_string(),
74            description:
75                "List every live chat session currently managed by the host — including the \
76                 caller's own session and any sessions the caller (or its peers) have spawned. \
77                 Use this to discover session ids before calling sessions_history or sessions_send. \
78                 Returns a JSON array of session summaries (id, channel, peer, timestamps, \
79                 message_count, optional parent)."
80                    .to_string(),
81            input_schema: ToolInputSchema::object(HashMap::new(), vec![]),
82            requires_approval: false,
83            ..Default::default()
84        }
85    }
86
87    fn history_tool() -> Tool {
88        let mut props = HashMap::new();
89        props.insert(
90            "session_id".to_string(),
91            json!({
92                "type": "string",
93                "description": "The target session id (from sessions_list)."
94            }),
95        );
96        props.insert(
97            "limit".to_string(),
98            json!({
99                "type": "number",
100                "description": format!(
101                    "Max messages to return (default {DEFAULT_HISTORY_LIMIT}, \
102                     hard-capped at {MAX_HISTORY_LIMIT})."
103                ),
104            }),
105        );
106        Tool {
107            name: TOOL_SESSIONS_HISTORY.to_string(),
108            description: "Return a target session's recent transcript as a JSON array of \
109                 {role, content, timestamp} objects (newest last). Use this to catch up \
110                 on what a spawned sub-session has produced, or to read another user's \
111                 ongoing conversation before intervening."
112                .to_string(),
113            input_schema: ToolInputSchema::object(props, vec!["session_id".to_string()]),
114            requires_approval: false,
115            ..Default::default()
116        }
117    }
118
119    fn send_tool() -> Tool {
120        let mut props = HashMap::new();
121        props.insert(
122            "session_id".to_string(),
123            json!({
124                "type": "string",
125                "description": "Target session id. Must not equal the caller's own session \
126                                (self-send is rejected to prevent recursion)."
127            }),
128        );
129        props.insert(
130            "text".to_string(),
131            json!({
132                "type": "string",
133                "description": "The user-role message to inject into the target session's \
134                                inbound queue."
135            }),
136        );
137        Tool {
138            name: TOOL_SESSIONS_SEND.to_string(),
139            description:
140                "Inject a user-role message into another session's inbound queue. Fire-and-forget: \
141                 returns {\"ok\": true} as soon as the message is queued; the target session \
142                 processes it asynchronously. Use this to nudge a spawned sub-session, relay \
143                 information between two user sessions, or ask a peer session a follow-up \
144                 question."
145                    .to_string(),
146            input_schema: ToolInputSchema::object(
147                props,
148                vec!["session_id".to_string(), "text".to_string()],
149            ),
150            // Sending on behalf of the agent into another live session is a
151            // cross-user side effect; the host should gate it with its
152            // normal approval policy.
153            requires_approval: true,
154            ..Default::default()
155        }
156    }
157
158    fn spawn_tool() -> Tool {
159        let mut props = HashMap::new();
160        props.insert(
161            "prompt".to_string(),
162            json!({
163                "type": "string",
164                "description": "Initial user message to seed the new session with."
165            }),
166        );
167        props.insert(
168            "model".to_string(),
169            json!({
170                "type": "string",
171                "description": "Optional model override (e.g. 'claude-opus-4-7'). Omit to inherit from parent."
172            }),
173        );
174        props.insert(
175            "system".to_string(),
176            json!({
177                "type": "string",
178                "description": "Optional system prompt for the sub-session. Omit to inherit."
179            }),
180        );
181        props.insert(
182            "tools".to_string(),
183            json!({
184                "type": "array",
185                "items": { "type": "string" },
186                "description": "Optional allow-list of tool names the sub-session may invoke. Omit to inherit the parent's toolset."
187            }),
188        );
189        props.insert(
190            "wait_for_first_reply".to_string(),
191            json!({
192                "type": "boolean",
193                "description": "If true, block this tool call until the sub-session produces \
194                                its first assistant message (or wait_timeout_secs elapses). \
195                                Default false — return immediately with just the session id.",
196                "default": false
197            }),
198        );
199        props.insert(
200            "wait_timeout_secs".to_string(),
201            json!({
202                "type": "number",
203                "description": "Seconds to wait when wait_for_first_reply is true (default 60).",
204                "default": 60
205            }),
206        );
207
208        Tool {
209            name: TOOL_SESSIONS_SPAWN.to_string(),
210            description:
211                "Spawn a new chat sub-session as a child of the current session, seeded with \
212                 `prompt`. Returns {session_id, first_reply?}. Use this to delegate a focused \
213                 task (e.g. 'spawn a research sub-session and return in 5m') — the parent can \
214                 later inspect progress via sessions_history or push updates via sessions_send."
215                    .to_string(),
216            input_schema: ToolInputSchema::object(props, vec!["prompt".to_string()]),
217            requires_approval: true,
218            ..Default::default()
219        }
220    }
221
222    // ── Execution ───────────────────────────────────────────────────────
223
224    /// Dispatch a tool call by name. Returns a [`ToolResult`] (never errors
225    /// out to an `anyhow::Result`; broker failures become `ToolResult::error`
226    /// so the LLM sees them as tool output rather than an executor crash).
227    pub async fn execute(
228        &self,
229        tool_use_id: &str,
230        tool_name: &str,
231        input: &Value,
232        context: &ToolContext,
233    ) -> ToolResult {
234        match tool_name {
235            TOOL_SESSIONS_LIST => self.exec_list(tool_use_id).await,
236            TOOL_SESSIONS_HISTORY => self.exec_history(tool_use_id, input).await,
237            TOOL_SESSIONS_SEND => self.exec_send(tool_use_id, input, context).await,
238            TOOL_SESSIONS_SPAWN => self.exec_spawn(tool_use_id, input, context).await,
239            other => ToolResult::error(
240                tool_use_id.to_string(),
241                format!("Unknown sessions tool: {other}"),
242            ),
243        }
244    }
245
246    async fn exec_list(&self, tool_use_id: &str) -> ToolResult {
247        match self.broker.list().await {
248            Ok(summaries) => match serde_json::to_string(&summaries) {
249                Ok(body) => ToolResult::success(tool_use_id.to_string(), body),
250                Err(e) => ToolResult::error(
251                    tool_use_id.to_string(),
252                    format!("Failed to serialize session list: {e}"),
253                ),
254            },
255            Err(e) => ToolResult::error(
256                tool_use_id.to_string(),
257                format!("sessions_list failed: {e}"),
258            ),
259        }
260    }
261
262    async fn exec_history(&self, tool_use_id: &str, input: &Value) -> ToolResult {
263        #[derive(Deserialize)]
264        struct In {
265            session_id: Option<String>,
266            #[serde(default)]
267            limit: Option<usize>,
268        }
269        let raw: In = match serde_json::from_value(input.clone()) {
270            Ok(v) => v,
271            Err(e) => {
272                return ToolResult::error(
273                    tool_use_id.to_string(),
274                    format!("Invalid sessions_history input: {e}"),
275                );
276            }
277        };
278        let sid = match raw.session_id.filter(|s| !s.is_empty()) {
279            Some(s) => SessionId(s),
280            None => {
281                return ToolResult::error(
282                    tool_use_id.to_string(),
283                    "sessions_history requires a non-empty `session_id`".to_string(),
284                );
285            }
286        };
287        let limit = Some(
288            raw.limit
289                .unwrap_or(DEFAULT_HISTORY_LIMIT)
290                .min(MAX_HISTORY_LIMIT),
291        );
292        match self.broker.history(&sid, limit).await {
293            Ok(msgs) => match serde_json::to_string(&msgs) {
294                Ok(body) => ToolResult::success(tool_use_id.to_string(), body),
295                Err(e) => ToolResult::error(
296                    tool_use_id.to_string(),
297                    format!("Failed to serialize session history: {e}"),
298                ),
299            },
300            Err(e) => ToolResult::error(
301                tool_use_id.to_string(),
302                format!("sessions_history failed: {e}"),
303            ),
304        }
305    }
306
307    async fn exec_send(
308        &self,
309        tool_use_id: &str,
310        input: &Value,
311        context: &ToolContext,
312    ) -> ToolResult {
313        #[derive(Deserialize)]
314        struct In {
315            session_id: Option<String>,
316            text: Option<String>,
317        }
318        let raw: In = match serde_json::from_value(input.clone()) {
319            Ok(v) => v,
320            Err(e) => {
321                return ToolResult::error(
322                    tool_use_id.to_string(),
323                    format!("Invalid sessions_send input: {e}"),
324                );
325            }
326        };
327        let sid = match raw.session_id.filter(|s| !s.is_empty()) {
328            Some(s) => SessionId(s),
329            None => {
330                return ToolResult::error(
331                    tool_use_id.to_string(),
332                    "sessions_send requires a non-empty `session_id`".to_string(),
333                );
334            }
335        };
336        let text = match raw.text {
337            Some(t) if !t.is_empty() => t,
338            _ => {
339                return ToolResult::error(
340                    tool_use_id.to_string(),
341                    "sessions_send requires a non-empty `text`".to_string(),
342                );
343            }
344        };
345
346        if let Some(self_id) = self.resolve_current_session_id(context)
347            && self_id == sid
348        {
349            return ToolResult::error(
350                tool_use_id.to_string(),
351                "sessions_send cannot target the caller's own session — that would recurse. \
352                 Use a spawned sub-session id, or address a peer session from sessions_list."
353                    .to_string(),
354            );
355        }
356
357        match self.broker.send(&sid, text).await {
358            Ok(()) => ToolResult::success(tool_use_id.to_string(), json!({"ok": true}).to_string()),
359            Err(e) => ToolResult::error(
360                tool_use_id.to_string(),
361                format!("sessions_send failed: {e}"),
362            ),
363        }
364    }
365
366    async fn exec_spawn(
367        &self,
368        tool_use_id: &str,
369        input: &Value,
370        context: &ToolContext,
371    ) -> ToolResult {
372        let req: SpawnRequest = match serde_json::from_value(input.clone()) {
373            Ok(v) => v,
374            Err(e) => {
375                return ToolResult::error(
376                    tool_use_id.to_string(),
377                    format!("Invalid sessions_spawn input: {e}"),
378                );
379            }
380        };
381        if req.prompt.is_empty() {
382            return ToolResult::error(
383                tool_use_id.to_string(),
384                "sessions_spawn requires a non-empty `prompt`".to_string(),
385            );
386        }
387        let parent = match self.resolve_current_session_id(context) {
388            Some(id) => id,
389            None => {
390                return ToolResult::error(
391                    tool_use_id.to_string(),
392                    "sessions_spawn could not determine the caller's session id — \
393                     host must set ToolContext::metadata[\"session_id\"] or pass \
394                     current_session_id into SessionsTool::new."
395                        .to_string(),
396                );
397            }
398        };
399
400        match self.broker.spawn(&parent, req).await {
401            Ok(spawned) => match serde_json::to_string(&spawned) {
402                Ok(body) => ToolResult::success(tool_use_id.to_string(), body),
403                Err(e) => ToolResult::error(
404                    tool_use_id.to_string(),
405                    format!("Failed to serialize spawned session: {e}"),
406                ),
407            },
408            Err(e) => ToolResult::error(
409                tool_use_id.to_string(),
410                format!("sessions_spawn failed: {e}"),
411            ),
412        }
413    }
414
415    fn resolve_current_session_id(&self, context: &ToolContext) -> Option<SessionId> {
416        context
417            .metadata
418            .get(CTX_METADATA_SESSION_ID)
419            .filter(|s| !s.is_empty())
420            .map(|s| SessionId(s.clone()))
421            .or_else(|| self.current_session_id.clone())
422    }
423}
424
425// ─────────────────────────────────────────────────────────────────────────────
426// Tests
427// ─────────────────────────────────────────────────────────────────────────────
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use async_trait::async_trait;
433    use brainwires_stores::session::broker::{SessionMessage, SessionSummary, SpawnedSession};
434    use chrono::{TimeZone, Utc};
435    use std::sync::Mutex;
436
437    /// Hand-rolled test double — no mocking framework.
438    struct MockBroker {
439        list_ret: Mutex<Vec<SessionSummary>>,
440        history_ret: Mutex<Vec<SessionMessage>>,
441        // Captured inputs on each call.
442        history_calls: Mutex<Vec<(SessionId, Option<usize>)>>,
443        send_calls: Mutex<Vec<(SessionId, String)>>,
444        spawn_calls: Mutex<Vec<(SessionId, SpawnRequest)>>,
445        spawn_ret: Mutex<Option<SpawnedSession>>,
446    }
447
448    impl MockBroker {
449        fn new() -> Self {
450            Self {
451                list_ret: Mutex::new(Vec::new()),
452                history_ret: Mutex::new(Vec::new()),
453                history_calls: Mutex::new(Vec::new()),
454                send_calls: Mutex::new(Vec::new()),
455                spawn_calls: Mutex::new(Vec::new()),
456                spawn_ret: Mutex::new(None),
457            }
458        }
459    }
460
461    #[async_trait]
462    impl SessionBroker for MockBroker {
463        async fn list(&self) -> anyhow::Result<Vec<SessionSummary>> {
464            Ok(self.list_ret.lock().unwrap().clone())
465        }
466
467        async fn history(
468            &self,
469            id: &SessionId,
470            limit: Option<usize>,
471        ) -> anyhow::Result<Vec<SessionMessage>> {
472            self.history_calls.lock().unwrap().push((id.clone(), limit));
473            Ok(self.history_ret.lock().unwrap().clone())
474        }
475
476        async fn send(&self, id: &SessionId, text: String) -> anyhow::Result<()> {
477            self.send_calls.lock().unwrap().push((id.clone(), text));
478            Ok(())
479        }
480
481        async fn spawn(
482            &self,
483            parent: &SessionId,
484            req: SpawnRequest,
485        ) -> anyhow::Result<SpawnedSession> {
486            self.spawn_calls
487                .lock()
488                .unwrap()
489                .push((parent.clone(), req.clone()));
490            Ok(self
491                .spawn_ret
492                .lock()
493                .unwrap()
494                .clone()
495                .unwrap_or(SpawnedSession {
496                    id: SessionId("spawned-1".into()),
497                    first_reply: None,
498                }))
499        }
500    }
501
502    fn fixed_ts() -> chrono::DateTime<Utc> {
503        Utc.with_ymd_and_hms(2026, 4, 19, 12, 0, 0).unwrap()
504    }
505
506    fn ctx_with_session(session: &str) -> ToolContext {
507        let mut ctx = ToolContext::default();
508        ctx.metadata
509            .insert(CTX_METADATA_SESSION_ID.to_string(), session.to_string());
510        ctx
511    }
512
513    #[test]
514    fn list_tool_schema_shape() {
515        let tools = SessionsTool::get_tools();
516        let list = tools
517            .iter()
518            .find(|t| t.name == TOOL_SESSIONS_LIST)
519            .expect("list tool present");
520        // No required inputs.
521        let required = list.input_schema.required.clone().unwrap_or_default();
522        assert!(
523            required.is_empty(),
524            "sessions_list must have no required inputs, got {required:?}"
525        );
526        assert!(!list.description.is_empty());
527        // Four tools exposed.
528        let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
529        assert!(names.contains(&TOOL_SESSIONS_LIST));
530        assert!(names.contains(&TOOL_SESSIONS_HISTORY));
531        assert!(names.contains(&TOOL_SESSIONS_SEND));
532        assert!(names.contains(&TOOL_SESSIONS_SPAWN));
533    }
534
535    #[tokio::test]
536    async fn history_tool_rejects_missing_session_id() {
537        let broker = Arc::new(MockBroker::new());
538        let tool = SessionsTool::new(broker.clone(), Some(SessionId("self".into())));
539        let ctx = ctx_with_session("self");
540        let result = tool
541            .execute("call-1", TOOL_SESSIONS_HISTORY, &json!({}), &ctx)
542            .await;
543        assert!(result.is_error, "expected error result, got {result:?}");
544        assert!(
545            result.content.to_lowercase().contains("session_id"),
546            "error should mention session_id, got: {}",
547            result.content
548        );
549        assert!(
550            broker.history_calls.lock().unwrap().is_empty(),
551            "broker must not be called for invalid input"
552        );
553    }
554
555    #[tokio::test]
556    async fn history_tool_clamps_limit() {
557        let broker = Arc::new(MockBroker::new());
558        broker.history_ret.lock().unwrap().push(SessionMessage {
559            role: "user".into(),
560            content: "hi".into(),
561            timestamp: fixed_ts(),
562        });
563        let tool = SessionsTool::new(broker.clone(), Some(SessionId("self".into())));
564        let ctx = ctx_with_session("self");
565        let input = json!({"session_id": "target", "limit": 9999});
566        let result = tool
567            .execute("call-1", TOOL_SESSIONS_HISTORY, &input, &ctx)
568            .await;
569        assert!(!result.is_error, "unexpected error: {}", result.content);
570        let calls = broker.history_calls.lock().unwrap();
571        assert_eq!(calls.len(), 1);
572        assert_eq!(calls[0].0, SessionId("target".into()));
573        assert_eq!(
574            calls[0].1,
575            Some(MAX_HISTORY_LIMIT),
576            "limit must be clamped to MAX_HISTORY_LIMIT ({MAX_HISTORY_LIMIT})",
577        );
578    }
579
580    #[tokio::test]
581    async fn send_tool_self_send_rejected() {
582        let broker = Arc::new(MockBroker::new());
583        let tool = SessionsTool::new(broker.clone(), Some(SessionId("me".into())));
584        let ctx = ctx_with_session("me");
585        let input = json!({"session_id": "me", "text": "hello"});
586        let result = tool
587            .execute("call-1", TOOL_SESSIONS_SEND, &input, &ctx)
588            .await;
589        assert!(result.is_error);
590        assert!(
591            result.content.to_lowercase().contains("recurs"),
592            "error should mention recursion, got: {}",
593            result.content
594        );
595        assert!(broker.send_calls.lock().unwrap().is_empty());
596    }
597
598    #[tokio::test]
599    async fn send_tool_forwards_to_broker_when_distinct() {
600        let broker = Arc::new(MockBroker::new());
601        let tool = SessionsTool::new(broker.clone(), Some(SessionId("me".into())));
602        let ctx = ctx_with_session("me");
603        let input = json!({"session_id": "peer", "text": "ping"});
604        let result = tool
605            .execute("call-1", TOOL_SESSIONS_SEND, &input, &ctx)
606            .await;
607        assert!(!result.is_error, "unexpected error: {}", result.content);
608        let calls = broker.send_calls.lock().unwrap();
609        assert_eq!(calls.len(), 1);
610        assert_eq!(calls[0].0, SessionId("peer".into()));
611        assert_eq!(calls[0].1, "ping");
612        assert!(result.content.contains("\"ok\""));
613    }
614
615    #[tokio::test]
616    async fn spawn_tool_passes_through() {
617        let broker = Arc::new(MockBroker::new());
618        let tool = SessionsTool::new(broker.clone(), Some(SessionId("parent".into())));
619        let ctx = ctx_with_session("parent");
620        let input = json!({
621            "prompt": "research the openclaw parity gap",
622            "model": "claude-opus-4-7",
623            "system": "you are a research agent",
624            "tools": ["fetch_url", "query_codebase"],
625            "wait_for_first_reply": true,
626            "wait_timeout_secs": 30u64,
627        });
628        let result = tool
629            .execute("call-1", TOOL_SESSIONS_SPAWN, &input, &ctx)
630            .await;
631        assert!(!result.is_error, "unexpected error: {}", result.content);
632        let calls = broker.spawn_calls.lock().unwrap();
633        assert_eq!(calls.len(), 1);
634        assert_eq!(calls[0].0, SessionId("parent".into()));
635        let req = &calls[0].1;
636        assert_eq!(req.prompt, "research the openclaw parity gap");
637        assert_eq!(req.model.as_deref(), Some("claude-opus-4-7"));
638        assert_eq!(req.system.as_deref(), Some("you are a research agent"));
639        assert_eq!(
640            req.tools.as_deref(),
641            Some(["fetch_url".to_string(), "query_codebase".to_string()].as_slice())
642        );
643        assert!(req.wait_for_first_reply);
644        assert_eq!(req.wait_timeout_secs, 30);
645    }
646
647    #[tokio::test]
648    async fn spawn_tool_errors_without_parent_session() {
649        let broker = Arc::new(MockBroker::new());
650        // No metadata, no current_session_id → parent unknown.
651        let tool = SessionsTool::new(broker.clone(), None);
652        let ctx = ToolContext::default();
653        let input = json!({"prompt": "x"});
654        let result = tool
655            .execute("call-1", TOOL_SESSIONS_SPAWN, &input, &ctx)
656            .await;
657        assert!(result.is_error);
658        assert!(
659            result.content.contains("session") && result.content.to_lowercase().contains("caller")
660        );
661        assert!(broker.spawn_calls.lock().unwrap().is_empty());
662    }
663
664    #[tokio::test]
665    async fn list_returns_json_array() {
666        let broker = Arc::new(MockBroker::new());
667        broker.list_ret.lock().unwrap().push(SessionSummary {
668            id: SessionId("s1".into()),
669            channel: "discord".into(),
670            peer: "alice".into(),
671            created_at: fixed_ts(),
672            last_active: fixed_ts(),
673            message_count: 3,
674            parent: None,
675        });
676        let tool = SessionsTool::new(broker.clone(), Some(SessionId("me".into())));
677        let ctx = ctx_with_session("me");
678        let result = tool
679            .execute("c1", TOOL_SESSIONS_LIST, &json!({}), &ctx)
680            .await;
681        assert!(!result.is_error);
682        assert!(result.content.starts_with('['));
683        assert!(result.content.contains("\"s1\""));
684        assert!(result.content.contains("\"discord\""));
685    }
686}