Skip to main content

plexus_substrate/activations/claudecode_loopback/
activation.rs

1use super::storage::{LoopbackStorage, LoopbackStorageConfig};
2use super::types::*;
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::hub_methods;
6use serde_json::{json, Value};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::time::sleep;
10
11/// ClaudeCode Loopback - routes tool permissions back to parent for approval
12#[derive(Clone)]
13pub struct ClaudeCodeLoopback {
14    storage: Arc<LoopbackStorage>,
15    mcp_url: String,
16}
17
18impl ClaudeCodeLoopback {
19    pub async fn new(config: LoopbackStorageConfig) -> Result<Self, String> {
20        let storage = LoopbackStorage::new(config).await?;
21        let mcp_url = std::env::var("PLEXUS_MCP_URL")
22            .unwrap_or_else(|_| "http://127.0.0.1:4445/mcp".to_string());
23
24        Ok(Self {
25            storage: Arc::new(storage),
26            mcp_url,
27        })
28    }
29
30    pub fn with_mcp_url(mut self, url: String) -> Self {
31        self.mcp_url = url;
32        self
33    }
34
35    /// Get the underlying storage (for sharing with ClaudeCode)
36    pub fn storage(&self) -> Arc<LoopbackStorage> {
37        self.storage.clone()
38    }
39}
40
41#[hub_methods(
42    namespace = "loopback",
43    version = "1.0.0",
44    description = "Route tool permissions to parent for approval"
45)]
46impl ClaudeCodeLoopback {
47    /// Permission prompt handler - blocks until parent approves/denies
48    ///
49    /// This is called by Claude Code CLI via --permission-prompt-tool.
50    /// It blocks (polls) until the parent calls loopback.respond().
51    ///
52    /// Returns a JSON string (not object) because Claude Code expects the MCP response
53    /// to have the permission JSON already stringified in content[0].text.
54    /// See: https://github.com/anthropics/claude-code/blob/main/docs/permission-prompt-tool.md
55    #[plexus_macros::hub_method(params(
56        tool_name = "Name of the tool being requested",
57        tool_use_id = "Unique ID for this tool invocation",
58        input = "Tool input parameters"
59    ))]
60    async fn permit(
61        &self,
62        tool_name: String,
63        tool_use_id: String,
64        input: Value,
65    ) -> impl Stream<Item = String> + Send + 'static {
66        // IMMEDIATE DEBUG: Log before stream starts
67        eprintln!("[LOOPBACK] permit called: tool={}, tool_use_id={}", tool_name, tool_use_id);
68
69        let storage = self.storage.clone();
70
71        // Look up session ID from pre-registered tool_use_id mapping
72        // This mapping was set by run_chat_background when it saw the ToolUse event
73        let session_id = storage.lookup_session_by_tool(&tool_use_id)
74            .unwrap_or_else(|| "unknown".to_string());
75
76        stream! {
77            // DEBUG: Log the lookup result
78            eprintln!("[LOOPBACK] permit: tool_use_id={} mapped to session_id={}", tool_use_id, session_id);
79
80            // Create approval request
81            let approval = match storage.create_approval(
82                &session_id,
83                &tool_name,
84                &tool_use_id,
85                &input,
86            ).await {
87                Ok(a) => a,
88                Err(e) => {
89                    // Return deny response as JSON string
90                    let response = json!({
91                        "behavior": "deny",
92                        "message": format!("Failed to create approval: {}", e)
93                    });
94                    yield response.to_string();
95                    return;
96                }
97            };
98
99            let approval_id = approval.id;
100            let timeout_secs = 300u64; // 5 minute timeout
101            let poll_interval = Duration::from_secs(1);
102            let start = std::time::Instant::now();
103
104            // Blocking poll loop - like HumanLayer's hlyr
105            loop {
106                // Check timeout
107                if start.elapsed().as_secs() > timeout_secs {
108                    let _ = storage.resolve_approval(&approval_id, false, Some("Timed out".to_string())).await;
109                    let response = json!({
110                        "behavior": "deny",
111                        "message": "Approval request timed out"
112                    });
113                    yield response.to_string();
114                    return;
115                }
116
117                // Poll for resolution
118                match storage.get_approval(&approval_id).await {
119                    Ok(current) => {
120                        match current.status {
121                            ApprovalStatus::Approved => {
122                                // Return allow response as JSON string
123                                // Claude Code expects: {"behavior": "allow", "updatedInput": {...}}
124                                let response = json!({
125                                    "behavior": "allow",
126                                    "updatedInput": input.clone()
127                                });
128                                yield response.to_string();
129                                return;
130                            }
131                            ApprovalStatus::Denied => {
132                                let response = json!({
133                                    "behavior": "deny",
134                                    "message": current.response_message.unwrap_or_else(|| "Denied by parent".to_string())
135                                });
136                                yield response.to_string();
137                                return;
138                            }
139                            ApprovalStatus::TimedOut => {
140                                let response = json!({
141                                    "behavior": "deny",
142                                    "message": "Approval timed out"
143                                });
144                                yield response.to_string();
145                                return;
146                            }
147                            ApprovalStatus::Pending => {
148                                // Continue polling
149                            }
150                        }
151                    }
152                    Err(e) => {
153                        let response = json!({
154                            "behavior": "deny",
155                            "message": format!("Failed to check approval: {}", e)
156                        });
157                        yield response.to_string();
158                        return;
159                    }
160                }
161
162                sleep(poll_interval).await;
163            }
164        }
165    }
166
167    /// Respond to a pending approval request
168    #[plexus_macros::hub_method(params(
169        approval_id = "ID of the approval request",
170        approve = "Whether to approve (true) or deny (false)",
171        message = "Optional message/reason"
172    ))]
173    async fn respond(
174        &self,
175        approval_id: ApprovalId,
176        approve: bool,
177        message: Option<String>,
178    ) -> impl Stream<Item = RespondResult> + Send + 'static {
179        let storage = self.storage.clone();
180
181        stream! {
182            match storage.resolve_approval(&approval_id, approve, message).await {
183                Ok(()) => {
184                    yield RespondResult::Ok { approval_id };
185                }
186                Err(e) => {
187                    yield RespondResult::Err { message: e };
188                }
189            }
190        }
191    }
192
193    /// List pending approval requests
194    #[plexus_macros::hub_method(params(
195        session_id = "Optional session ID to filter by"
196    ))]
197    async fn pending(
198        &self,
199        session_id: Option<String>,
200    ) -> impl Stream<Item = PendingResult> + Send + 'static {
201        let storage = self.storage.clone();
202
203        stream! {
204            match storage.list_pending(session_id.as_deref()).await {
205                Ok(approvals) => {
206                    yield PendingResult::Ok { approvals };
207                }
208                Err(e) => {
209                    yield PendingResult::Err { message: e };
210                }
211            }
212        }
213    }
214
215    /// Generate MCP configuration for a loopback session
216    #[plexus_macros::hub_method(params(
217        session_id = "Session ID for correlation"
218    ))]
219    async fn configure(
220        &self,
221        session_id: String,
222    ) -> impl Stream<Item = ConfigureResult> + Send + 'static {
223        let mcp_url = self.mcp_url.clone();
224
225        stream! {
226            // Include session_id in env config for correlation
227            let config = json!({
228                "mcpServers": {
229                    "plexus": {
230                        "type": "http",
231                        "url": mcp_url
232                    }
233                },
234                "env": {
235                    "LOOPBACK_SESSION_ID": session_id
236                }
237            });
238
239            yield ConfigureResult::Ok { mcp_config: config };
240        }
241    }
242}