plexus_substrate/activations/claudecode_loopback/
activation.rs1use super::storage::{LoopbackStorage, LoopbackStorageConfig};
2use super::types::*;
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::activation;
6use serde_json::{json, Value};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::time::sleep;
10
11#[derive(Clone)]
13pub struct ClaudeCodeLoopback {
14 storage: Arc<LoopbackStorage>,
15 mcp_url: String,
16}
17
18impl ClaudeCodeLoopback {
19 pub async fn new(config: LoopbackStorageConfig) -> Result<Self, String> {
20 let storage = LoopbackStorage::new(config).await?;
21 let mcp_url = std::env::var("PLEXUS_MCP_URL")
22 .unwrap_or_else(|_| "http://127.0.0.1:4445/mcp".to_string());
23
24 Ok(Self {
25 storage: Arc::new(storage),
26 mcp_url,
27 })
28 }
29
30 pub fn with_mcp_url(mut self, url: String) -> Self {
31 self.mcp_url = url;
32 self
33 }
34
35 pub fn storage(&self) -> Arc<LoopbackStorage> {
37 self.storage.clone()
38 }
39}
40
41#[plexus_macros::activation(namespace = "loopback",
42version = "1.0.0",
43description = "Route tool permissions to parent for approval", crate_path = "plexus_core")]
44impl ClaudeCodeLoopback {
45 #[plexus_macros::method(params(
54 tool_name = "Name of the tool being requested",
55 tool_use_id = "Unique ID for this tool invocation",
56 input = "Tool input parameters",
57 _connection = "HTTP connection metadata (optional)" ))]
59 async fn permit(
60 &self,
61 tool_name: String,
62 tool_use_id: String,
63 input: Value,
64 _connection: Option<Value>,
65 ) -> impl Stream<Item = String> + Send + 'static {
66 tracing::debug!("[LOOPBACK] permit called: tool={}, tool_use_id={}", tool_name, tool_use_id);
68
69 let storage = self.storage.clone();
70
71 let session_id = _connection
74 .as_ref()
75 .and_then(|conn| conn.get("query.session_id"))
76 .and_then(|v| v.as_str())
77 .map(|s| s.to_string())
78 .or_else(|| std::env::var("PLEXUS_SESSION_ID").ok())
79 .or_else(|| storage.lookup_session_by_tool(&tool_use_id))
80 .unwrap_or_else(|| "unknown".to_string());
81
82 stream! {
83 tracing::debug!("[LOOPBACK] permit: tool_use_id={} mapped to session_id={}", tool_use_id, session_id);
85
86 let approval = match storage.create_approval(
88 &session_id,
89 &tool_name,
90 &tool_use_id,
91 &input,
92 ).await {
93 Ok(a) => a,
94 Err(e) => {
95 let response = json!({
97 "behavior": "deny",
98 "message": format!("Failed to create approval: {}", e)
99 });
100 yield response.to_string();
101 return;
102 }
103 };
104
105 let approval_id = approval.id;
106 let timeout_secs = 300u64; let poll_interval = Duration::from_secs(1);
108 let start = std::time::Instant::now();
109
110 loop {
112 if start.elapsed().as_secs() > timeout_secs {
114 let _ = storage.resolve_approval(&approval_id, false, Some("Timed out".to_string())).await;
115 let response = json!({
116 "behavior": "deny",
117 "message": "Approval request timed out"
118 });
119 yield response.to_string();
120 return;
121 }
122
123 match storage.get_approval(&approval_id).await {
125 Ok(current) => {
126 match current.status {
127 ApprovalStatus::Approved => {
128 let response = json!({
131 "behavior": "allow",
132 "updatedInput": input.clone()
133 });
134 yield response.to_string();
135 return;
136 }
137 ApprovalStatus::Denied => {
138 let response = json!({
139 "behavior": "deny",
140 "message": current.response_message.unwrap_or_else(|| "Denied by parent".to_string())
141 });
142 yield response.to_string();
143 return;
144 }
145 ApprovalStatus::TimedOut => {
146 let response = json!({
147 "behavior": "deny",
148 "message": "Approval timed out"
149 });
150 yield response.to_string();
151 return;
152 }
153 ApprovalStatus::Pending => {
154 }
156 }
157 }
158 Err(e) => {
159 let response = json!({
160 "behavior": "deny",
161 "message": format!("Failed to check approval: {}", e)
162 });
163 yield response.to_string();
164 return;
165 }
166 }
167
168 sleep(poll_interval).await;
169 }
170 }
171 }
172
173 #[plexus_macros::method(params(
175 approval_id = "ID of the approval request",
176 approve = "Whether to approve (true) or deny (false)",
177 message = "Optional message/reason"
178 ))]
179 async fn respond(
180 &self,
181 approval_id: ApprovalId,
182 approve: bool,
183 message: Option<String>,
184 ) -> impl Stream<Item = RespondResult> + Send + 'static {
185 let storage = self.storage.clone();
186
187 stream! {
188 match storage.resolve_approval(&approval_id, approve, message).await {
189 Ok(()) => {
190 yield RespondResult::Ok { approval_id };
191 }
192 Err(e) => {
193 yield RespondResult::Err { message: e.to_string() };
194 }
195 }
196 }
197 }
198
199 #[plexus_macros::method(params(
201 session_id = "Optional session ID to filter by"
202 ))]
203 async fn pending(
204 &self,
205 session_id: Option<String>,
206 ) -> impl Stream<Item = PendingResult> + Send + 'static {
207 let storage = self.storage.clone();
208
209 stream! {
210 match storage.list_pending(session_id.as_deref()).await {
211 Ok(approvals) => {
212 yield PendingResult::Ok { approvals };
213 }
214 Err(e) => {
215 yield PendingResult::Err { message: e.to_string() };
216 }
217 }
218 }
219 }
220
221 #[plexus_macros::method(params(
229 session_id = "Session ID to wait for approvals",
230 timeout_secs = "Optional timeout in seconds (default: 300 = 5 minutes)"
231 ))]
232 async fn wait_for_approval(
233 &self,
234 session_id: String,
235 timeout_secs: Option<u64>,
236 ) -> impl Stream<Item = WaitForApprovalResult> + Send + 'static {
237 let storage = self.storage.clone();
238 let timeout = Duration::from_secs(timeout_secs.unwrap_or(300));
239
240 stream! {
241 let notifier = storage.get_or_create_notifier(&session_id);
243
244 let start = std::time::Instant::now();
246
247 loop {
248 match storage.list_pending(Some(&session_id)).await {
250 Ok(approvals) if !approvals.is_empty() => {
251 yield WaitForApprovalResult::Ok { approvals };
253 return;
254 }
255 Err(e) => {
256 yield WaitForApprovalResult::Err {
257 message: format!("Failed to check pending approvals: {}", e)
258 };
259 return;
260 }
261 _ => {
262 }
264 }
265
266 if start.elapsed() >= timeout {
268 yield WaitForApprovalResult::Timeout {
269 message: format!("No approval received within {} seconds", timeout.as_secs())
270 };
271 return;
272 }
273
274 tokio::select! {
277 _ = notifier.notified() => {
278 continue;
280 }
281 _ = sleep(timeout.saturating_sub(start.elapsed())) => {
282 yield WaitForApprovalResult::Timeout {
284 message: format!("No approval received within {} seconds", timeout.as_secs())
285 };
286 return;
287 }
288 }
289 }
290 }
291 }
292
293 #[plexus_macros::method(params(
295 session_id = "Session ID for correlation"
296 ))]
297 async fn configure(
298 &self,
299 session_id: String,
300 ) -> impl Stream<Item = ConfigureResult> + Send + 'static {
301 let mcp_url = self.mcp_url.clone();
302
303 stream! {
304 let config = json!({
306 "mcpServers": {
307 "plexus": {
308 "type": "http",
309 "url": mcp_url
310 }
311 },
312 "env": {
313 "LOOPBACK_SESSION_ID": session_id
314 }
315 });
316
317 yield ConfigureResult::Ok { mcp_config: config };
318 }
319 }
320}