Skip to main content

construct/tools/
sessions.rs

1//! Session-to-session messaging tools for inter-agent communication.
2//!
3//! Provides three tools:
4//! - `sessions_list` — list active sessions with metadata
5//! - `sessions_history` — read message history from a specific session
6//! - `sessions_send` — send a message to a specific session
7
8use super::traits::{Tool, ToolResult};
9use crate::channels::session_backend::SessionBackend;
10use crate::security::SecurityPolicy;
11use crate::security::policy::ToolOperation;
12use async_trait::async_trait;
13use serde_json::json;
14use std::fmt::Write;
15use std::sync::Arc;
16
17/// Validate that a session ID is non-empty and contains at least one
18/// alphanumeric character (prevents blank keys after sanitization).
19fn validate_session_id(session_id: &str) -> Result<(), ToolResult> {
20    let trimmed = session_id.trim();
21    if trimmed.is_empty() || !trimmed.chars().any(|c| c.is_alphanumeric()) {
22        return Err(ToolResult {
23            success: false,
24            output: String::new(),
25            error: Some(
26                "Invalid 'session_id': must be non-empty and contain at least one alphanumeric character.".into(),
27            ),
28        });
29    }
30    Ok(())
31}
32
33// ── SessionsListTool ────────────────────────────────────────────────
34
35/// Lists active sessions with their channel, last activity time, and message count.
36pub struct SessionsListTool {
37    backend: Arc<dyn SessionBackend>,
38}
39
40impl SessionsListTool {
41    pub fn new(backend: Arc<dyn SessionBackend>) -> Self {
42        Self { backend }
43    }
44}
45
46#[async_trait]
47impl Tool for SessionsListTool {
48    fn name(&self) -> &str {
49        "sessions_list"
50    }
51
52    fn description(&self) -> &str {
53        "List all active conversation sessions with their channel, last activity time, and message count."
54    }
55
56    fn parameters_schema(&self) -> serde_json::Value {
57        json!({
58            "type": "object",
59            "properties": {
60                "limit": {
61                    "type": "integer",
62                    "description": "Max sessions to return (default: 50)"
63                }
64            }
65        })
66    }
67
68    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
69        #[allow(clippy::cast_possible_truncation)]
70        let limit = args
71            .get("limit")
72            .and_then(serde_json::Value::as_u64)
73            .map_or(50, |v| v as usize);
74
75        let metadata = self.backend.list_sessions_with_metadata();
76
77        if metadata.is_empty() {
78            return Ok(ToolResult {
79                success: true,
80                output: "No active sessions found.".into(),
81                error: None,
82            });
83        }
84
85        let capped: Vec<_> = metadata.into_iter().take(limit).collect();
86        let mut output = format!("Found {} session(s):\n", capped.len());
87        for meta in &capped {
88            // Extract channel from key (convention: channel__identifier)
89            let channel = meta.key.split("__").next().unwrap_or(&meta.key);
90            let _ = writeln!(
91                output,
92                "- {}: channel={}, messages={}, last_activity={}",
93                meta.key, channel, meta.message_count, meta.last_activity
94            );
95        }
96
97        Ok(ToolResult {
98            success: true,
99            output,
100            error: None,
101        })
102    }
103}
104
105// ── SessionsHistoryTool ─────────────────────────────────────────────
106
107/// Reads the message history of a specific session by ID.
108pub struct SessionsHistoryTool {
109    backend: Arc<dyn SessionBackend>,
110    security: Arc<SecurityPolicy>,
111}
112
113impl SessionsHistoryTool {
114    pub fn new(backend: Arc<dyn SessionBackend>, security: Arc<SecurityPolicy>) -> Self {
115        Self { backend, security }
116    }
117}
118
119#[async_trait]
120impl Tool for SessionsHistoryTool {
121    fn name(&self) -> &str {
122        "sessions_history"
123    }
124
125    fn description(&self) -> &str {
126        "Read the message history of a specific session by its session ID. Returns the last N messages."
127    }
128
129    fn parameters_schema(&self) -> serde_json::Value {
130        json!({
131            "type": "object",
132            "properties": {
133                "session_id": {
134                    "type": "string",
135                    "description": "The session ID to read history from (e.g. telegram__user123)"
136                },
137                "limit": {
138                    "type": "integer",
139                    "description": "Max messages to return, from most recent (default: 20)"
140                }
141            },
142            "required": ["session_id"]
143        })
144    }
145
146    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
147        if let Err(error) = self
148            .security
149            .enforce_tool_operation(ToolOperation::Read, "sessions_history")
150        {
151            return Ok(ToolResult {
152                success: false,
153                output: String::new(),
154                error: Some(error),
155            });
156        }
157
158        let session_id = args
159            .get("session_id")
160            .and_then(|v| v.as_str())
161            .ok_or_else(|| anyhow::anyhow!("Missing 'session_id' parameter"))?;
162
163        if let Err(result) = validate_session_id(session_id) {
164            return Ok(result);
165        }
166
167        #[allow(clippy::cast_possible_truncation)]
168        let limit = args
169            .get("limit")
170            .and_then(serde_json::Value::as_u64)
171            .map_or(20, |v| v as usize);
172
173        let messages = self.backend.load(session_id);
174
175        if messages.is_empty() {
176            return Ok(ToolResult {
177                success: true,
178                output: format!("No messages found for session '{session_id}'."),
179                error: None,
180            });
181        }
182
183        // Take the last `limit` messages
184        let start = messages.len().saturating_sub(limit);
185        let tail = &messages[start..];
186
187        let mut output = format!(
188            "Session '{}': showing {}/{} messages\n",
189            session_id,
190            tail.len(),
191            messages.len()
192        );
193        for msg in tail {
194            let _ = writeln!(output, "[{}] {}", msg.role, msg.content);
195        }
196
197        Ok(ToolResult {
198            success: true,
199            output,
200            error: None,
201        })
202    }
203}
204
205// ── SessionsSendTool ────────────────────────────────────────────────
206
207/// Sends a message to a specific session, enabling inter-agent communication.
208pub struct SessionsSendTool {
209    backend: Arc<dyn SessionBackend>,
210    security: Arc<SecurityPolicy>,
211}
212
213impl SessionsSendTool {
214    pub fn new(backend: Arc<dyn SessionBackend>, security: Arc<SecurityPolicy>) -> Self {
215        Self { backend, security }
216    }
217}
218
219#[async_trait]
220impl Tool for SessionsSendTool {
221    fn name(&self) -> &str {
222        "sessions_send"
223    }
224
225    fn description(&self) -> &str {
226        "Send a message to a specific session by its session ID. The message is appended to the session's conversation history as a 'user' message, enabling inter-agent communication."
227    }
228
229    fn parameters_schema(&self) -> serde_json::Value {
230        json!({
231            "type": "object",
232            "properties": {
233                "session_id": {
234                    "type": "string",
235                    "description": "The target session ID (e.g. telegram__user123)"
236                },
237                "message": {
238                    "type": "string",
239                    "description": "The message content to send"
240                }
241            },
242            "required": ["session_id", "message"]
243        })
244    }
245
246    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
247        if let Err(error) = self
248            .security
249            .enforce_tool_operation(ToolOperation::Act, "sessions_send")
250        {
251            return Ok(ToolResult {
252                success: false,
253                output: String::new(),
254                error: Some(error),
255            });
256        }
257
258        let session_id = args
259            .get("session_id")
260            .and_then(|v| v.as_str())
261            .ok_or_else(|| anyhow::anyhow!("Missing 'session_id' parameter"))?;
262
263        if let Err(result) = validate_session_id(session_id) {
264            return Ok(result);
265        }
266
267        let message = args
268            .get("message")
269            .and_then(|v| v.as_str())
270            .ok_or_else(|| anyhow::anyhow!("Missing 'message' parameter"))?;
271
272        if message.trim().is_empty() {
273            return Ok(ToolResult {
274                success: false,
275                output: String::new(),
276                error: Some("Message content must not be empty.".into()),
277            });
278        }
279
280        let chat_msg = crate::providers::traits::ChatMessage::user(message);
281
282        match self.backend.append(session_id, &chat_msg) {
283            Ok(()) => Ok(ToolResult {
284                success: true,
285                output: format!("Message sent to session '{session_id}'."),
286                error: None,
287            }),
288            Err(e) => Ok(ToolResult {
289                success: false,
290                output: String::new(),
291                error: Some(format!("Failed to send message: {e}")),
292            }),
293        }
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::channels::session_store::SessionStore;
301    use crate::providers::traits::ChatMessage;
302    use tempfile::TempDir;
303
304    fn test_security() -> Arc<SecurityPolicy> {
305        Arc::new(SecurityPolicy::default())
306    }
307
308    fn test_backend() -> (TempDir, Arc<dyn SessionBackend>) {
309        let tmp = TempDir::new().unwrap();
310        let store = SessionStore::new(tmp.path()).unwrap();
311        (tmp, Arc::new(store))
312    }
313
314    fn seeded_backend() -> (TempDir, Arc<dyn SessionBackend>) {
315        let tmp = TempDir::new().unwrap();
316        let store = SessionStore::new(tmp.path()).unwrap();
317        store
318            .append("telegram__alice", &ChatMessage::user("Hello from Alice"))
319            .unwrap();
320        store
321            .append(
322                "telegram__alice",
323                &ChatMessage::assistant("Hi Alice, how can I help?"),
324            )
325            .unwrap();
326        store
327            .append("discord__bob", &ChatMessage::user("Hey from Bob"))
328            .unwrap();
329        (tmp, Arc::new(store))
330    }
331
332    // ── SessionsListTool tests ──────────────────────────────────────
333
334    #[tokio::test]
335    async fn list_empty_sessions() {
336        let (_tmp, backend) = test_backend();
337        let tool = SessionsListTool::new(backend);
338        let result = tool.execute(json!({})).await.unwrap();
339        assert!(result.success);
340        assert!(result.output.contains("No active sessions"));
341    }
342
343    #[tokio::test]
344    async fn list_sessions_shows_all() {
345        let (_tmp, backend) = seeded_backend();
346        let tool = SessionsListTool::new(backend);
347        let result = tool.execute(json!({})).await.unwrap();
348        assert!(result.success);
349        assert!(result.output.contains("2 session(s)"));
350        assert!(result.output.contains("telegram__alice"));
351        assert!(result.output.contains("discord__bob"));
352    }
353
354    #[tokio::test]
355    async fn list_sessions_respects_limit() {
356        let (_tmp, backend) = seeded_backend();
357        let tool = SessionsListTool::new(backend);
358        let result = tool.execute(json!({"limit": 1})).await.unwrap();
359        assert!(result.success);
360        assert!(result.output.contains("1 session(s)"));
361    }
362
363    #[tokio::test]
364    async fn list_sessions_extracts_channel() {
365        let (_tmp, backend) = seeded_backend();
366        let tool = SessionsListTool::new(backend);
367        let result = tool.execute(json!({})).await.unwrap();
368        assert!(result.output.contains("channel=telegram"));
369        assert!(result.output.contains("channel=discord"));
370    }
371
372    #[test]
373    fn list_tool_name_and_schema() {
374        let (_tmp, backend) = test_backend();
375        let tool = SessionsListTool::new(backend);
376        assert_eq!(tool.name(), "sessions_list");
377        assert!(tool.parameters_schema()["properties"]["limit"].is_object());
378    }
379
380    // ── SessionsHistoryTool tests ───────────────────────────────────
381
382    #[tokio::test]
383    async fn history_empty_session() {
384        let (_tmp, backend) = test_backend();
385        let tool = SessionsHistoryTool::new(backend, test_security());
386        let result = tool
387            .execute(json!({"session_id": "nonexistent"}))
388            .await
389            .unwrap();
390        assert!(result.success);
391        assert!(result.output.contains("No messages found"));
392    }
393
394    #[tokio::test]
395    async fn history_returns_messages() {
396        let (_tmp, backend) = seeded_backend();
397        let tool = SessionsHistoryTool::new(backend, test_security());
398        let result = tool
399            .execute(json!({"session_id": "telegram__alice"}))
400            .await
401            .unwrap();
402        assert!(result.success);
403        assert!(result.output.contains("showing 2/2 messages"));
404        assert!(result.output.contains("[user] Hello from Alice"));
405        assert!(result.output.contains("[assistant] Hi Alice"));
406    }
407
408    #[tokio::test]
409    async fn history_respects_limit() {
410        let (_tmp, backend) = seeded_backend();
411        let tool = SessionsHistoryTool::new(backend, test_security());
412        let result = tool
413            .execute(json!({"session_id": "telegram__alice", "limit": 1}))
414            .await
415            .unwrap();
416        assert!(result.success);
417        assert!(result.output.contains("showing 1/2 messages"));
418        // Should show only the last message
419        assert!(result.output.contains("[assistant]"));
420        assert!(!result.output.contains("[user] Hello from Alice"));
421    }
422
423    #[tokio::test]
424    async fn history_missing_session_id() {
425        let (_tmp, backend) = test_backend();
426        let tool = SessionsHistoryTool::new(backend, test_security());
427        let result = tool.execute(json!({})).await;
428        assert!(result.is_err());
429        assert!(result.unwrap_err().to_string().contains("session_id"));
430    }
431
432    #[tokio::test]
433    async fn history_rejects_empty_session_id() {
434        let (_tmp, backend) = test_backend();
435        let tool = SessionsHistoryTool::new(backend, test_security());
436        let result = tool.execute(json!({"session_id": "   "})).await.unwrap();
437        assert!(!result.success);
438        assert!(result.error.unwrap().contains("Invalid"));
439    }
440
441    #[test]
442    fn history_tool_name_and_schema() {
443        let (_tmp, backend) = test_backend();
444        let tool = SessionsHistoryTool::new(backend, test_security());
445        assert_eq!(tool.name(), "sessions_history");
446        let schema = tool.parameters_schema();
447        assert!(schema["properties"]["session_id"].is_object());
448        assert!(
449            schema["required"]
450                .as_array()
451                .unwrap()
452                .contains(&json!("session_id"))
453        );
454    }
455
456    // ── SessionsSendTool tests ──────────────────────────────────────
457
458    #[tokio::test]
459    async fn send_appends_message() {
460        let (_tmp, backend) = test_backend();
461        let tool = SessionsSendTool::new(backend.clone(), test_security());
462        let result = tool
463            .execute(json!({
464                "session_id": "telegram__alice",
465                "message": "Hello from another agent"
466            }))
467            .await
468            .unwrap();
469        assert!(result.success);
470        assert!(result.output.contains("Message sent"));
471
472        // Verify message was appended
473        let messages = backend.load("telegram__alice");
474        assert_eq!(messages.len(), 1);
475        assert_eq!(messages[0].role, "user");
476        assert_eq!(messages[0].content, "Hello from another agent");
477    }
478
479    #[tokio::test]
480    async fn send_to_existing_session() {
481        let (_tmp, backend) = seeded_backend();
482        let tool = SessionsSendTool::new(backend.clone(), test_security());
483        let result = tool
484            .execute(json!({
485                "session_id": "telegram__alice",
486                "message": "Inter-agent message"
487            }))
488            .await
489            .unwrap();
490        assert!(result.success);
491
492        let messages = backend.load("telegram__alice");
493        assert_eq!(messages.len(), 3);
494        assert_eq!(messages[2].content, "Inter-agent message");
495    }
496
497    #[tokio::test]
498    async fn send_rejects_empty_message() {
499        let (_tmp, backend) = test_backend();
500        let tool = SessionsSendTool::new(backend, test_security());
501        let result = tool
502            .execute(json!({
503                "session_id": "telegram__alice",
504                "message": "   "
505            }))
506            .await
507            .unwrap();
508        assert!(!result.success);
509        assert!(result.error.unwrap().contains("empty"));
510    }
511
512    #[tokio::test]
513    async fn send_rejects_empty_session_id() {
514        let (_tmp, backend) = test_backend();
515        let tool = SessionsSendTool::new(backend, test_security());
516        let result = tool
517            .execute(json!({
518                "session_id": "",
519                "message": "hello"
520            }))
521            .await
522            .unwrap();
523        assert!(!result.success);
524        assert!(result.error.unwrap().contains("Invalid"));
525    }
526
527    #[tokio::test]
528    async fn send_rejects_non_alphanumeric_session_id() {
529        let (_tmp, backend) = test_backend();
530        let tool = SessionsSendTool::new(backend, test_security());
531        let result = tool
532            .execute(json!({
533                "session_id": "///",
534                "message": "hello"
535            }))
536            .await
537            .unwrap();
538        assert!(!result.success);
539        assert!(result.error.unwrap().contains("Invalid"));
540    }
541
542    #[tokio::test]
543    async fn send_missing_session_id() {
544        let (_tmp, backend) = test_backend();
545        let tool = SessionsSendTool::new(backend, test_security());
546        let result = tool.execute(json!({"message": "hi"})).await;
547        assert!(result.is_err());
548        assert!(result.unwrap_err().to_string().contains("session_id"));
549    }
550
551    #[tokio::test]
552    async fn send_missing_message() {
553        let (_tmp, backend) = test_backend();
554        let tool = SessionsSendTool::new(backend, test_security());
555        let result = tool.execute(json!({"session_id": "telegram__alice"})).await;
556        assert!(result.is_err());
557        assert!(result.unwrap_err().to_string().contains("message"));
558    }
559
560    #[test]
561    fn send_tool_name_and_schema() {
562        let (_tmp, backend) = test_backend();
563        let tool = SessionsSendTool::new(backend, test_security());
564        assert_eq!(tool.name(), "sessions_send");
565        let schema = tool.parameters_schema();
566        assert!(
567            schema["required"]
568                .as_array()
569                .unwrap()
570                .contains(&json!("session_id"))
571        );
572        assert!(
573            schema["required"]
574                .as_array()
575                .unwrap()
576                .contains(&json!("message"))
577        );
578    }
579}