Skip to main content

fude/
acp_commands.rs

1//! Synchronous dispatch wrappers for ACP IPC commands.
2//! Runs the underlying async ops on a dedicated current-thread tokio runtime.
3
4use std::future::Future;
5use std::sync::Arc;
6
7use serde_json::Value;
8use tokio::runtime::Handle;
9
10use crate::acp::{ensure_acp, AcpState};
11use crate::events::EventEmitter;
12use crate::fs::FsState;
13
14pub struct AcpCtx {
15    pub state: Arc<AcpState>,
16    pub handle: Handle,
17}
18
19impl AcpCtx {
20    pub fn new(state: Arc<AcpState>) -> Self {
21        let (tx, rx) = std::sync::mpsc::channel();
22        std::thread::Builder::new()
23            .name("fude-acp-rt".into())
24            .spawn(move || {
25                let rt = tokio::runtime::Builder::new_current_thread()
26                    .enable_io()
27                    .enable_time()
28                    .build()
29                    .expect("tokio runtime");
30                tx.send(rt.handle().clone()).ok();
31                rt.block_on(std::future::pending::<()>());
32            })
33            .expect("spawn fude-acp-rt");
34        let handle = rx.recv().expect("tokio handle");
35        Self { state, handle }
36    }
37
38    pub fn run<F, T>(&self, fut: F) -> T
39    where
40        F: Future<Output = T> + Send + 'static,
41        T: Send + 'static,
42    {
43        let (tx, rx) = std::sync::mpsc::channel();
44        self.handle.spawn(async move {
45            let _ = tx.send(fut.await);
46        });
47        rx.recv().expect("tokio task panicked")
48    }
49}
50
51fn arg_str<'a>(args: &'a Value, key: &str) -> Result<&'a str, String> {
52    args.get(key)
53        .and_then(|v| v.as_str())
54        .ok_or_else(|| format!("missing arg: {}", key))
55}
56
57fn require_fs(fs: Option<&Arc<FsState>>) -> Result<&Arc<FsState>, String> {
58    fs.ok_or_else(|| "ACP commands require App::with_fs_sandbox".to_string())
59}
60
61pub fn get_adapter(ctx: &AcpCtx) -> Result<Value, String> {
62    let state = ctx.state.clone();
63    let name = ctx.run(async move { state.adapter.lock().await.clone() });
64    Ok(Value::from(name))
65}
66
67pub fn set_adapter(ctx: &AcpCtx, args: &Value) -> Result<Value, String> {
68    let next = arg_str(args, "adapter")?.to_string();
69    if ctx.state.find_adapter(&next).is_none() {
70        return Err(format!("Unknown ACP adapter: {}", next));
71    }
72    let state = ctx.state.clone();
73    let next_for_rt = next.clone();
74    ctx.run(async move {
75        let mut adapter_guard = state.adapter.lock().await;
76        if *adapter_guard != next_for_rt {
77            *adapter_guard = next_for_rt;
78            let mut process_guard = state.process.lock().await;
79            if let Some(acp) = process_guard.take() {
80                acp.kill().await;
81            }
82        }
83    });
84    Ok(Value::from(next))
85}
86
87pub fn initialize(
88    ctx: &AcpCtx,
89    fs: Option<&Arc<FsState>>,
90    emitter: &EventEmitter,
91) -> Result<Value, String> {
92    let fs = require_fs(fs)?;
93    let state = ctx.state.clone();
94    let ap = fs.allowed_paths.clone();
95    let ad = fs.allowed_dirs.clone();
96    let em = emitter.clone();
97    let client_name = state.client_name.clone();
98    let version = state.client_version.clone();
99    ctx.run(async move {
100        let acp = ensure_acp(&state, em, ap, ad).await?;
101        acp.request(
102            "initialize",
103            serde_json::json!({
104                "protocolVersion": 1,
105                "clientInfo": { "name": client_name, "title": client_name, "version": version },
106                "clientCapabilities": {
107                    "fs": { "readTextFile": true, "writeTextFile": true }
108                }
109            }),
110        )
111        .await
112    })
113}
114
115pub fn new_session(
116    ctx: &AcpCtx,
117    fs: Option<&Arc<FsState>>,
118    emitter: &EventEmitter,
119    args: &Value,
120) -> Result<Value, String> {
121    let fs = require_fs(fs)?;
122    let cwd = arg_str(args, "cwd")?.to_string();
123    let state = ctx.state.clone();
124    let ap = fs.allowed_paths.clone();
125    let ad = fs.allowed_dirs.clone();
126    let em = emitter.clone();
127    ctx.run(async move {
128        let acp = ensure_acp(&state, em, ap, ad).await?;
129        acp.request(
130            "session/new",
131            serde_json::json!({ "cwd": cwd, "mcpServers": [] }),
132        )
133        .await
134    })
135}
136
137pub fn prompt(
138    ctx: &AcpCtx,
139    fs: Option<&Arc<FsState>>,
140    emitter: &EventEmitter,
141    args: &Value,
142) -> Result<Value, String> {
143    let fs = require_fs(fs)?;
144    let session_id = arg_str(args, "sessionId")?.to_string();
145    let prompt = arg_str(args, "prompt")?.to_string();
146    let state = ctx.state.clone();
147    let ap = fs.allowed_paths.clone();
148    let ad = fs.allowed_dirs.clone();
149    let em = emitter.clone();
150    ctx.run(async move {
151        let acp = ensure_acp(&state, em, ap, ad).await?;
152        acp.request(
153            "session/prompt",
154            serde_json::json!({
155                "sessionId": session_id,
156                "prompt": [{ "type": "text", "text": prompt }]
157            }),
158        )
159        .await
160    })
161}
162
163pub fn cancel(
164    ctx: &AcpCtx,
165    fs: Option<&Arc<FsState>>,
166    emitter: &EventEmitter,
167    args: &Value,
168) -> Result<Value, String> {
169    let fs = require_fs(fs)?;
170    let session_id = arg_str(args, "sessionId")?.to_string();
171    let state = ctx.state.clone();
172    let ap = fs.allowed_paths.clone();
173    let ad = fs.allowed_dirs.clone();
174    let em = emitter.clone();
175    ctx.run(async move {
176        let acp = ensure_acp(&state, em, ap, ad).await?;
177        acp.notify(
178            "session/cancel",
179            serde_json::json!({ "sessionId": session_id }),
180        )
181        .await?;
182        Ok(Value::Null)
183    })
184}
185
186pub fn set_model(
187    ctx: &AcpCtx,
188    fs: Option<&Arc<FsState>>,
189    emitter: &EventEmitter,
190    args: &Value,
191) -> Result<Value, String> {
192    let fs = require_fs(fs)?;
193    let session_id = arg_str(args, "sessionId")?.to_string();
194    let model_id = arg_str(args, "modelId")?.to_string();
195    let state = ctx.state.clone();
196    let ap = fs.allowed_paths.clone();
197    let ad = fs.allowed_dirs.clone();
198    let em = emitter.clone();
199    ctx.run(async move {
200        let acp = ensure_acp(&state, em, ap, ad).await?;
201        acp.request(
202            "session/set_model",
203            serde_json::json!({ "sessionId": session_id, "modelId": model_id }),
204        )
205        .await
206    })
207}
208
209pub fn set_config(
210    ctx: &AcpCtx,
211    fs: Option<&Arc<FsState>>,
212    emitter: &EventEmitter,
213    args: &Value,
214) -> Result<Value, String> {
215    let fs = require_fs(fs)?;
216    let session_id = arg_str(args, "sessionId")?.to_string();
217    let config_id = arg_str(args, "configId")?.to_string();
218    let value = arg_str(args, "value")?.to_string();
219    let state = ctx.state.clone();
220    let ap = fs.allowed_paths.clone();
221    let ad = fs.allowed_dirs.clone();
222    let em = emitter.clone();
223    ctx.run(async move {
224        let acp = ensure_acp(&state, em, ap, ad).await?;
225        acp.request(
226            "session/set_config_option",
227            serde_json::json!({ "sessionId": session_id, "configId": config_id, "value": value }),
228        )
229        .await
230    })
231}
232
233pub fn list_sessions(
234    ctx: &AcpCtx,
235    fs: Option<&Arc<FsState>>,
236    emitter: &EventEmitter,
237    args: &Value,
238) -> Result<Value, String> {
239    let fs = require_fs(fs)?;
240    let cwd = args
241        .get("cwd")
242        .and_then(|v| v.as_str())
243        .map(|s| s.to_string());
244    let state = ctx.state.clone();
245    let ap = fs.allowed_paths.clone();
246    let ad = fs.allowed_dirs.clone();
247    let em = emitter.clone();
248    ctx.run(async move {
249        let acp = ensure_acp(&state, em, ap, ad).await?;
250        acp.request("session/list", serde_json::json!({ "cwd": cwd }))
251            .await
252    })
253}
254
255pub fn resume_session(
256    ctx: &AcpCtx,
257    fs: Option<&Arc<FsState>>,
258    emitter: &EventEmitter,
259    args: &Value,
260) -> Result<Value, String> {
261    let fs = require_fs(fs)?;
262    let session_id = arg_str(args, "sessionId")?.to_string();
263    let cwd = arg_str(args, "cwd")?.to_string();
264    let state = ctx.state.clone();
265    let ap = fs.allowed_paths.clone();
266    let ad = fs.allowed_dirs.clone();
267    let em = emitter.clone();
268    ctx.run(async move {
269        let acp = ensure_acp(&state, em, ap, ad).await?;
270        acp.request(
271            "session/resume",
272            serde_json::json!({ "sessionId": session_id, "cwd": cwd, "mcpServers": [] }),
273        )
274        .await
275    })
276}
277
278pub fn shutdown(ctx: &AcpCtx) -> Result<Value, String> {
279    let state = ctx.state.clone();
280    ctx.run(async move {
281        let mut guard = state.process.lock().await;
282        if let Some(acp) = guard.take() {
283            acp.kill().await;
284        }
285    });
286    Ok(Value::Null)
287}