Skip to main content

pawan/tools/
rmux.rs

1//! RMUX tool: typed terminal multiplexer control for long-running agent workflows.
2//!
3//! Pawan uses this as the first integration point for durable terminal panes:
4//! create/reuse a named RMUX session, drive input, wait for visible text, and
5//! capture pane snapshots without scraping an ad-hoc shell subprocess.
6
7use std::time::Duration;
8
9use async_trait::async_trait;
10use rmux_sdk::{
11    EnsureSession, EnsureSessionPolicy, PaneProcessState, Rmux, SessionName, TerminalSizeSpec,
12};
13use serde::Deserialize;
14use serde_json::{json, Value};
15
16use super::Tool;
17use crate::{PawanError, Result};
18
19const DEFAULT_TIMEOUT_SECS: u64 = 10;
20const DEFAULT_WINDOW: u32 = 0;
21const DEFAULT_PANE: u32 = 0;
22
23#[derive(Debug, Deserialize)]
24struct RmuxArgs {
25    action: String,
26    session: Option<String>,
27    window: Option<u32>,
28    pane: Option<u32>,
29    text: Option<String>,
30    key: Option<String>,
31    cols: Option<u16>,
32    rows: Option<u16>,
33    cwd: Option<String>,
34    command: Option<String>,
35    detached: Option<bool>,
36    timeout_secs: Option<u64>,
37    title: Option<String>,
38    title_prefix: Option<String>,
39    command_contains: Option<String>,
40    cwd_contains: Option<String>,
41    running: Option<bool>,
42}
43
44#[derive(Clone, Default)]
45pub struct RmuxTool;
46
47impl RmuxTool {
48    pub fn new() -> Self {
49        Self
50    }
51
52    fn timeout(args: &RmuxArgs) -> Duration {
53        Duration::from_secs(args.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS))
54    }
55
56    async fn client(args: &RmuxArgs) -> Result<Rmux> {
57        Rmux::builder()
58            .default_timeout(Self::timeout(args))
59            .connect_or_start()
60            .await
61            .map_err(|e| {
62                PawanError::Tool(format!(
63                    "rmux connect_or_start failed: {e}. Ensure the rmux binary is installed, on PATH, and able to start its daemon."
64                ))
65            })
66    }
67
68    fn session_name(args: &RmuxArgs) -> Result<SessionName> {
69        let session = args
70            .session
71            .as_deref()
72            .ok_or_else(|| PawanError::Tool("rmux session is required".into()))?;
73        SessionName::new(session.to_string())
74            .map_err(|e| PawanError::Tool(format!("invalid rmux session name: {e}")))
75    }
76
77    async fn pane(rmux: &Rmux, args: &RmuxArgs) -> Result<rmux_sdk::Pane> {
78        let session_name = Self::session_name(args)?;
79        let window = args.window.unwrap_or(DEFAULT_WINDOW);
80        let pane = args.pane.unwrap_or(DEFAULT_PANE);
81        let session = rmux.session(session_name.clone()).await.map_err(|e| {
82            PawanError::Tool(format!(
83                "rmux session lookup failed for session '{}': {e}",
84                session_name.as_str()
85            ))
86        })?;
87        Ok(session.pane(window, pane))
88    }
89
90    async fn ensure_session(args: RmuxArgs) -> Result<Value> {
91        let session_name = Self::session_name(&args)?;
92        if matches!((args.cols, args.rows), (Some(_), None) | (None, Some(_))) {
93            return Err(PawanError::Tool(
94                "rmux cols and rows must be supplied together".into(),
95            ));
96        }
97
98        let rmux = Self::client(&args).await?;
99        let mut ensure = EnsureSession::named(session_name)
100            .policy(EnsureSessionPolicy::CreateOrReuse)
101            .detached(args.detached.unwrap_or(true));
102        if let (Some(cols), Some(rows)) = (args.cols, args.rows) {
103            ensure = ensure.size(TerminalSizeSpec::new(cols, rows));
104        }
105        if let Some(cwd) = args.cwd.as_deref() {
106            ensure = ensure.working_directory(cwd.to_string());
107        }
108        if let Some(command) = args.command.as_deref() {
109            ensure = ensure.shell(command.to_string());
110        }
111
112        let session = rmux
113            .ensure_session(ensure)
114            .await
115            .map_err(|e| PawanError::Tool(format!("rmux ensure_session failed: {e}")))?;
116        Ok(json!({
117            "session": session.name().as_str(),
118            "created": session.was_created(),
119            "endpoint": format!("{:?}", session.endpoint()),
120        }))
121    }
122
123    async fn send_text(args: RmuxArgs) -> Result<Value> {
124        let _session_name = Self::session_name(&args)?;
125        let text = args
126            .text
127            .as_deref()
128            .ok_or_else(|| PawanError::Tool("rmux text is required for send_text".into()))?;
129        let rmux = Self::client(&args).await?;
130        let pane = Self::pane(&rmux, &args).await?;
131        pane.send_text(text)
132            .await
133            .map_err(|e| PawanError::Tool(format!("rmux send_text failed: {e}")))?;
134        Ok(json!({"ok": true}))
135    }
136
137    async fn send_key(args: RmuxArgs) -> Result<Value> {
138        let _session_name = Self::session_name(&args)?;
139        let key = args
140            .key
141            .as_deref()
142            .ok_or_else(|| PawanError::Tool("rmux key is required for send_key".into()))?;
143        let rmux = Self::client(&args).await?;
144        let pane = Self::pane(&rmux, &args).await?;
145        pane.send_key(key)
146            .await
147            .map_err(|e| PawanError::Tool(format!("rmux send_key failed: {e}")))?;
148        Ok(json!({"ok": true}))
149    }
150
151    async fn wait_for_text(args: RmuxArgs) -> Result<Value> {
152        let _session_name = Self::session_name(&args)?;
153        let text = args
154            .text
155            .as_deref()
156            .ok_or_else(|| PawanError::Tool("rmux text is required for wait_for_text".into()))?;
157        let rmux = Self::client(&args).await?;
158        let pane = Self::pane(&rmux, &args).await?;
159        pane.wait_for_text(text)
160            .await
161            .map_err(|e| PawanError::Tool(format!("rmux wait_for_text failed: {e}")))?;
162        Ok(json!({"ok": true}))
163    }
164
165    async fn snapshot(args: RmuxArgs) -> Result<Value> {
166        let _session_name = Self::session_name(&args)?;
167        let rmux = Self::client(&args).await?;
168        let pane = Self::pane(&rmux, &args).await?;
169        let snapshot = pane
170            .snapshot()
171            .await
172            .map_err(|e| PawanError::Tool(format!("rmux snapshot failed: {e}")))?;
173        let visible_text = snapshot.visible_text();
174        Ok(json!({
175            "cols": snapshot.cols,
176            "rows": snapshot.rows,
177            "revision": snapshot.revision,
178            "text": visible_text,
179            "visible_text": visible_text,
180        }))
181    }
182
183    async fn list_sessions(args: RmuxArgs) -> Result<Value> {
184        let rmux = Self::client(&args).await?;
185        let sessions = rmux
186            .list_sessions()
187            .await
188            .map_err(|e| PawanError::Tool(format!("rmux list_sessions failed: {e}")))?
189            .into_iter()
190            .map(|session| session.as_str().to_string())
191            .collect::<Vec<_>>();
192        let count = sessions.len();
193        Ok(json!({"sessions": sessions, "count": count}))
194    }
195
196    async fn list_panes(args: RmuxArgs) -> Result<Value> {
197        let rmux = Self::client(&args).await?;
198        let mut finder = rmux.find_panes();
199        if let Some(session) = args.session.as_deref() {
200            finder = finder.session(session);
201        }
202        if let Some(title) = args.title.as_deref() {
203            finder = finder.title(title);
204        }
205        if let Some(title_prefix) = args.title_prefix.as_deref() {
206            finder = finder.title_prefix(title_prefix);
207        }
208        if let Some(command_contains) = args.command_contains.as_deref() {
209            finder = finder.command_contains(command_contains);
210        }
211        if let Some(cwd_contains) = args.cwd_contains.as_deref() {
212            finder = finder.cwd_contains(cwd_contains);
213        }
214        if let Some(window) = args.window {
215            finder = finder.window_index(window);
216        }
217        if let Some(running) = args.running {
218            finder = if running {
219                finder.running()
220            } else {
221                finder.exited()
222            };
223        }
224
225        let panes = finder
226            .all()
227            .await
228            .map_err(|e| PawanError::Tool(format!("rmux list_panes failed: {e}")))?
229            .into_iter()
230            .map(|pane| {
231                json!({
232                    "session": pane.session_name.as_str(),
233                    "session_id": pane.session_id.as_u32(),
234                    "window_id": pane.window_id.as_u32(),
235                    "window_index": pane.window_index,
236                    "pane_id": pane.pane_id.as_u32(),
237                    "pane_index": pane.pane_index,
238                    "title": pane.title,
239                    "command": pane.command,
240                    "working_directory": pane.working_directory,
241                    "tags": pane.tags,
242                    "process": pane_process_state(&pane.process),
243                })
244            })
245            .collect::<Vec<_>>();
246        let count = panes.len();
247        Ok(json!({"panes": panes, "count": count}))
248    }
249
250    async fn kill_session(args: RmuxArgs) -> Result<Value> {
251        let session_name = Self::session_name(&args)?;
252        let rmux = Self::client(&args).await?;
253        let session = rmux.session(session_name.clone()).await.map_err(|e| {
254            PawanError::Tool(format!(
255                "rmux session lookup failed for session '{}': {e}",
256                session_name.as_str()
257            ))
258        })?;
259        let killed = session
260            .kill()
261            .await
262            .map_err(|e| PawanError::Tool(format!("rmux kill_session failed: {e}")))?;
263        Ok(json!({"killed": killed}))
264    }
265}
266
267fn pane_process_state(process: &PaneProcessState) -> Value {
268    match process {
269        PaneProcessState::Unknown => json!({"state": "unknown"}),
270        PaneProcessState::Running { pid } => json!({"state": "running", "pid": pid}),
271        PaneProcessState::Exited => json!({"state": "exited"}),
272        _ => json!({"state": "unknown"}),
273    }
274}
275
276#[async_trait]
277impl Tool for RmuxTool {
278    fn name(&self) -> &str {
279        "rmux"
280    }
281
282    fn description(&self) -> &str {
283        "Drive persistent RMUX terminal sessions/panes: list sessions/panes, ensure sessions, send input, wait for text, capture snapshots, and clean up sessions."
284    }
285
286    fn mutating(&self) -> bool {
287        true
288    }
289
290    fn parameters_schema(&self) -> Value {
291        json!({
292            "type": "object",
293            "properties": {
294                "action": {
295                    "type": "string",
296                    "enum": ["list_sessions", "list_panes", "ensure_session", "send_text", "send_key", "wait_for_text", "snapshot", "kill_session"],
297                    "description": "RMUX operation to perform"
298                },
299                "session": {"type": "string", "description": "RMUX session name"},
300                "window": {"type": "integer", "minimum": 0, "default": 0},
301                "pane": {"type": "integer", "minimum": 0, "default": 0},
302                "text": {"type": "string", "description": "Text to send or wait for"},
303                "key": {"type": "string", "description": "tmux/RMUX key token, e.g. Enter or C-c"},
304                "cols": {"type": "integer", "minimum": 1, "default": 120},
305                "rows": {"type": "integer", "minimum": 1, "default": 32},
306                "cwd": {"type": "string", "description": "Initial working directory for a new session"},
307                "command": {"type": "string", "description": "Initial shell command for a new session"},
308                "detached": {"type": "boolean", "default": true},
309                "timeout_secs": {"type": "integer", "minimum": 1, "default": 10},
310                "title": {"type": "string", "description": "Restrict list_panes to exact pane title"},
311                "title_prefix": {"type": "string", "description": "Restrict list_panes to pane titles starting with prefix"},
312                "command_contains": {"type": "string", "description": "Restrict list_panes to argv containing this text"},
313                "cwd_contains": {"type": "string", "description": "Restrict list_panes to working directories containing this text"},
314                "running": {"type": "boolean", "description": "Restrict list_panes to running panes when true, exited panes when false"}
315            },
316            "required": ["action"]
317        })
318    }
319
320    async fn execute(&self, args: Value) -> Result<Value> {
321        let args: RmuxArgs = serde_json::from_value(args)
322            .map_err(|e| PawanError::Tool(format!("invalid rmux args: {e}")))?;
323        match args.action.as_str() {
324            "list_sessions" => Self::list_sessions(args).await,
325            "list_panes" => Self::list_panes(args).await,
326            "ensure_session" => Self::ensure_session(args).await,
327            "send_text" => Self::send_text(args).await,
328            "send_key" => Self::send_key(args).await,
329            "wait_for_text" => Self::wait_for_text(args).await,
330            "snapshot" => Self::snapshot(args).await,
331            "kill_session" => Self::kill_session(args).await,
332            other => Err(PawanError::Tool(format!(
333                "unknown rmux action: {other}. Use list_sessions, list_panes, ensure_session, send_text, send_key, wait_for_text, snapshot, or kill_session"
334            ))),
335        }
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn schema_lists_supported_actions() {
345        let schema = RmuxTool::new().parameters_schema();
346        let actions = schema["properties"]["action"]["enum"]
347            .as_array()
348            .expect("action enum");
349        assert!(actions.iter().any(|v| v == "list_sessions"));
350        assert!(actions.iter().any(|v| v == "list_panes"));
351        assert!(actions.iter().any(|v| v == "ensure_session"));
352        assert!(actions.iter().any(|v| v == "snapshot"));
353        assert!(actions.iter().any(|v| v == "kill_session"));
354    }
355
356    #[tokio::test]
357    async fn rejects_missing_session_before_connecting() {
358        let err = RmuxTool::new()
359            .execute(json!({"action": "snapshot", "timeout_secs": 1}))
360            .await
361            .unwrap_err();
362        assert!(err.to_string().contains("rmux session is required"));
363    }
364
365    #[tokio::test]
366    async fn rejects_partial_terminal_size_before_connecting() {
367        let err = RmuxTool::new()
368            .execute(json!({"action": "ensure_session", "session": "dev", "cols": 120, "timeout_secs": 1}))
369            .await
370            .unwrap_err();
371        assert!(err
372            .to_string()
373            .contains("rmux cols and rows must be supplied together"));
374    }
375
376    #[tokio::test]
377    async fn rejects_unknown_action_before_connecting() {
378        let err = RmuxTool::new()
379            .execute(json!({"action": "teleport"}))
380            .await
381            .unwrap_err();
382        assert!(err.to_string().contains("unknown rmux action"));
383    }
384}