Skip to main content

plexus_substrate/activations/claudecode_loopback/
activation.rs

1use super::storage::{LoopbackStorage, LoopbackStorageConfig};
2use super::types::*;
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use serde_json::{json, Value};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::time::sleep;
10
11/// ClaudeCode Loopback - routes tool permissions back to parent for approval
12#[derive(Clone)]
13pub struct ClaudeCodeLoopback {
14    storage: Arc<LoopbackStorage>,
15    mcp_url: String,
16}
17
18impl ClaudeCodeLoopback {
19    pub async fn new(config: LoopbackStorageConfig) -> Result<Self, String> {
20        let storage = LoopbackStorage::new(config).await?;
21        let mcp_url = std::env::var("PLEXUS_MCP_URL")
22            .unwrap_or_else(|_| "http://127.0.0.1:4445/mcp".to_string());
23
24        Ok(Self {
25            storage: Arc::new(storage),
26            mcp_url,
27        })
28    }
29
30    pub fn with_mcp_url(mut self, url: String) -> Self {
31        self.mcp_url = url;
32        self
33    }
34
35    /// Get the underlying storage (for sharing with ClaudeCode)
36    pub fn storage(&self) -> Arc<LoopbackStorage> {
37        self.storage.clone()
38    }
39}
40
41#[plexus_macros::activation(namespace = "loopback",
42version = "1.0.0",
43description = "Route tool permissions to parent for approval", crate_path = "plexus_core")]
44impl ClaudeCodeLoopback {
45    /// Permission prompt handler - blocks until parent approves/denies
46    ///
47    /// This is called by Claude Code CLI via --permission-prompt-tool.
48    /// It blocks (polls) until the parent calls loopback.respond().
49    ///
50    /// Returns a JSON string (not object) because Claude Code expects the MCP response
51    /// to have the permission JSON already stringified in content[0].text.
52    /// See: https://github.com/anthropics/claude-code/blob/main/docs/permission-prompt-tool.md
53    #[plexus_macros::method(params(
54        tool_name = "Name of the tool being requested",
55        tool_use_id = "Unique ID for this tool invocation",
56        input = "Tool input parameters",
57        _connection = "HTTP connection metadata (optional)" // Added for transparent query param forwarding
58    ))]
59    async fn permit(
60        &self,
61        tool_name: String,
62        tool_use_id: String,
63        input: Value,
64        _connection: Option<Value>,
65    ) -> impl Stream<Item = String> + Send + 'static {
66        // IMMEDIATE DEBUG: Log before stream starts
67        tracing::debug!("[LOOPBACK] permit called: tool={}, tool_use_id={}", tool_name, tool_use_id);
68
69        let storage = self.storage.clone();
70
71        // Try to get session_id from HTTP connection metadata first (transparent approach).
72        // If not available, fall back to tool_use_id mapping (legacy approach).
73        let session_id = _connection
74            .as_ref()
75            .and_then(|conn| conn.get("query.session_id"))
76            .and_then(|v| v.as_str())
77            .map(|s| s.to_string())
78            .or_else(|| std::env::var("PLEXUS_SESSION_ID").ok())
79            .or_else(|| storage.lookup_session_by_tool(&tool_use_id))
80            .unwrap_or_else(|| "unknown".to_string());
81
82        stream! {
83            // DEBUG: Log the lookup result
84            tracing::debug!("[LOOPBACK] permit: tool_use_id={} mapped to session_id={}", tool_use_id, session_id);
85
86            // Create approval request
87            let approval = match storage.create_approval(
88                &session_id,
89                &tool_name,
90                &tool_use_id,
91                &input,
92            ).await {
93                Ok(a) => a,
94                Err(e) => {
95                    // Return deny response as JSON string
96                    let response = json!({
97                        "behavior": "deny",
98                        "message": format!("Failed to create approval: {}", e)
99                    });
100                    yield response.to_string();
101                    return;
102                }
103            };
104
105            let approval_id = approval.id;
106            let timeout_secs = 300u64; // 5 minute timeout
107            let poll_interval = Duration::from_secs(1);
108            let start = std::time::Instant::now();
109
110            // Blocking poll loop - like HumanLayer's hlyr
111            loop {
112                // Check timeout
113                if start.elapsed().as_secs() > timeout_secs {
114                    let _ = storage.resolve_approval(&approval_id, false, Some("Timed out".to_string())).await;
115                    let response = json!({
116                        "behavior": "deny",
117                        "message": "Approval request timed out"
118                    });
119                    yield response.to_string();
120                    return;
121                }
122
123                // Poll for resolution
124                match storage.get_approval(&approval_id).await {
125                    Ok(current) => {
126                        match current.status {
127                            ApprovalStatus::Approved => {
128                                // Return allow response as JSON string
129                                // Claude Code expects: {"behavior": "allow", "updatedInput": {...}}
130                                let response = json!({
131                                    "behavior": "allow",
132                                    "updatedInput": input.clone()
133                                });
134                                yield response.to_string();
135                                return;
136                            }
137                            ApprovalStatus::Denied => {
138                                let response = json!({
139                                    "behavior": "deny",
140                                    "message": current.response_message.unwrap_or_else(|| "Denied by parent".to_string())
141                                });
142                                yield response.to_string();
143                                return;
144                            }
145                            ApprovalStatus::TimedOut => {
146                                let response = json!({
147                                    "behavior": "deny",
148                                    "message": "Approval timed out"
149                                });
150                                yield response.to_string();
151                                return;
152                            }
153                            ApprovalStatus::Pending => {
154                                // Continue polling
155                            }
156                        }
157                    }
158                    Err(e) => {
159                        let response = json!({
160                            "behavior": "deny",
161                            "message": format!("Failed to check approval: {}", e)
162                        });
163                        yield response.to_string();
164                        return;
165                    }
166                }
167
168                sleep(poll_interval).await;
169            }
170        }
171    }
172
173    /// Respond to a pending approval request
174    #[plexus_macros::method(params(
175        approval_id = "ID of the approval request",
176        approve = "Whether to approve (true) or deny (false)",
177        message = "Optional message/reason"
178    ))]
179    async fn respond(
180        &self,
181        approval_id: ApprovalId,
182        approve: bool,
183        message: Option<String>,
184    ) -> impl Stream<Item = RespondResult> + Send + 'static {
185        let storage = self.storage.clone();
186
187        stream! {
188            match storage.resolve_approval(&approval_id, approve, message).await {
189                Ok(()) => {
190                    yield RespondResult::Ok { approval_id };
191                }
192                Err(e) => {
193                    yield RespondResult::Err { message: e.to_string() };
194                }
195            }
196        }
197    }
198
199    /// List pending approval requests
200    #[plexus_macros::method(params(
201        session_id = "Optional session ID to filter by"
202    ))]
203    async fn pending(
204        &self,
205        session_id: Option<String>,
206    ) -> impl Stream<Item = PendingResult> + Send + 'static {
207        let storage = self.storage.clone();
208
209        stream! {
210            match storage.list_pending(session_id.as_deref()).await {
211                Ok(approvals) => {
212                    yield PendingResult::Ok { approvals };
213                }
214                Err(e) => {
215                    yield PendingResult::Err { message: e.to_string() };
216                }
217            }
218        }
219    }
220
221    /// Wait for a new approval request to arrive for a session
222    ///
223    /// This method blocks until a new approval arrives or the timeout is reached.
224    /// Unlike `pending` which returns a snapshot, this waits for new approvals
225    /// and returns immediately when one arrives.
226    ///
227    /// Use case: Claude Code can call this once and block, eliminating polling overhead.
228    #[plexus_macros::method(params(
229        session_id = "Session ID to wait for approvals",
230        timeout_secs = "Optional timeout in seconds (default: 300 = 5 minutes)"
231    ))]
232    async fn wait_for_approval(
233        &self,
234        session_id: String,
235        timeout_secs: Option<u64>,
236    ) -> impl Stream<Item = WaitForApprovalResult> + Send + 'static {
237        let storage = self.storage.clone();
238        let timeout = Duration::from_secs(timeout_secs.unwrap_or(300));
239
240        stream! {
241            // Get or create notifier for this session
242            let notifier = storage.get_or_create_notifier(&session_id);
243
244            // Record start time for timeout
245            let start = std::time::Instant::now();
246
247            loop {
248                // First check if there are already pending approvals
249                match storage.list_pending(Some(&session_id)).await {
250                    Ok(approvals) if !approvals.is_empty() => {
251                        // Found pending approval(s), return immediately
252                        yield WaitForApprovalResult::Ok { approvals };
253                        return;
254                    }
255                    Err(e) => {
256                        yield WaitForApprovalResult::Err {
257                            message: format!("Failed to check pending approvals: {}", e)
258                        };
259                        return;
260                    }
261                    _ => {
262                        // No pending approvals, continue to wait
263                    }
264                }
265
266                // Check if we've exceeded timeout
267                if start.elapsed() >= timeout {
268                    yield WaitForApprovalResult::Timeout {
269                        message: format!("No approval received within {} seconds", timeout.as_secs())
270                    };
271                    return;
272                }
273
274                // Wait for notification or timeout
275                // Use tokio::select! to race between notification and timeout
276                tokio::select! {
277                    _ = notifier.notified() => {
278                        // New approval arrived, loop will check pending again
279                        continue;
280                    }
281                    _ = sleep(timeout.saturating_sub(start.elapsed())) => {
282                        // Timeout reached
283                        yield WaitForApprovalResult::Timeout {
284                            message: format!("No approval received within {} seconds", timeout.as_secs())
285                        };
286                        return;
287                    }
288                }
289            }
290        }
291    }
292
293    /// Generate MCP configuration for a loopback session
294    #[plexus_macros::method(params(
295        session_id = "Session ID for correlation"
296    ))]
297    async fn configure(
298        &self,
299        session_id: String,
300    ) -> impl Stream<Item = ConfigureResult> + Send + 'static {
301        let mcp_url = self.mcp_url.clone();
302
303        stream! {
304            // Include session_id in env config for correlation
305            let config = json!({
306                "mcpServers": {
307                    "plexus": {
308                        "type": "http",
309                        "url": mcp_url
310                    }
311                },
312                "env": {
313                    "LOOPBACK_SESSION_ID": session_id
314                }
315            });
316
317            yield ConfigureResult::Ok { mcp_config: config };
318        }
319    }
320}