plexus_substrate/activations/claudecode_loopback/
activation.rs1use super::storage::{LoopbackStorage, LoopbackStorageConfig};
2use super::types::*;
3use async_stream::stream;
4use futures::Stream;
5use plexus_macros::hub_methods;
6use serde_json::{json, Value};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::time::sleep;
10
11#[derive(Clone)]
13pub struct ClaudeCodeLoopback {
14 storage: Arc<LoopbackStorage>,
15 mcp_url: String,
16}
17
18impl ClaudeCodeLoopback {
19 pub async fn new(config: LoopbackStorageConfig) -> Result<Self, String> {
20 let storage = LoopbackStorage::new(config).await?;
21 let mcp_url = std::env::var("PLEXUS_MCP_URL")
22 .unwrap_or_else(|_| "http://127.0.0.1:4445/mcp".to_string());
23
24 Ok(Self {
25 storage: Arc::new(storage),
26 mcp_url,
27 })
28 }
29
30 pub fn with_mcp_url(mut self, url: String) -> Self {
31 self.mcp_url = url;
32 self
33 }
34
35 pub fn storage(&self) -> Arc<LoopbackStorage> {
37 self.storage.clone()
38 }
39}
40
41#[hub_methods(
42 namespace = "loopback",
43 version = "1.0.0",
44 description = "Route tool permissions to parent for approval"
45)]
46impl ClaudeCodeLoopback {
47 #[plexus_macros::hub_method(params(
56 tool_name = "Name of the tool being requested",
57 tool_use_id = "Unique ID for this tool invocation",
58 input = "Tool input parameters"
59 ))]
60 async fn permit(
61 &self,
62 tool_name: String,
63 tool_use_id: String,
64 input: Value,
65 ) -> impl Stream<Item = String> + Send + 'static {
66 eprintln!("[LOOPBACK] permit called: tool={}, tool_use_id={}", tool_name, tool_use_id);
68
69 let storage = self.storage.clone();
70
71 let session_id = storage.lookup_session_by_tool(&tool_use_id)
74 .unwrap_or_else(|| "unknown".to_string());
75
76 stream! {
77 eprintln!("[LOOPBACK] permit: tool_use_id={} mapped to session_id={}", tool_use_id, session_id);
79
80 let approval = match storage.create_approval(
82 &session_id,
83 &tool_name,
84 &tool_use_id,
85 &input,
86 ).await {
87 Ok(a) => a,
88 Err(e) => {
89 let response = json!({
91 "behavior": "deny",
92 "message": format!("Failed to create approval: {}", e)
93 });
94 yield response.to_string();
95 return;
96 }
97 };
98
99 let approval_id = approval.id;
100 let timeout_secs = 300u64; let poll_interval = Duration::from_secs(1);
102 let start = std::time::Instant::now();
103
104 loop {
106 if start.elapsed().as_secs() > timeout_secs {
108 let _ = storage.resolve_approval(&approval_id, false, Some("Timed out".to_string())).await;
109 let response = json!({
110 "behavior": "deny",
111 "message": "Approval request timed out"
112 });
113 yield response.to_string();
114 return;
115 }
116
117 match storage.get_approval(&approval_id).await {
119 Ok(current) => {
120 match current.status {
121 ApprovalStatus::Approved => {
122 let response = json!({
125 "behavior": "allow",
126 "updatedInput": input.clone()
127 });
128 yield response.to_string();
129 return;
130 }
131 ApprovalStatus::Denied => {
132 let response = json!({
133 "behavior": "deny",
134 "message": current.response_message.unwrap_or_else(|| "Denied by parent".to_string())
135 });
136 yield response.to_string();
137 return;
138 }
139 ApprovalStatus::TimedOut => {
140 let response = json!({
141 "behavior": "deny",
142 "message": "Approval timed out"
143 });
144 yield response.to_string();
145 return;
146 }
147 ApprovalStatus::Pending => {
148 }
150 }
151 }
152 Err(e) => {
153 let response = json!({
154 "behavior": "deny",
155 "message": format!("Failed to check approval: {}", e)
156 });
157 yield response.to_string();
158 return;
159 }
160 }
161
162 sleep(poll_interval).await;
163 }
164 }
165 }
166
167 #[plexus_macros::hub_method(params(
169 approval_id = "ID of the approval request",
170 approve = "Whether to approve (true) or deny (false)",
171 message = "Optional message/reason"
172 ))]
173 async fn respond(
174 &self,
175 approval_id: ApprovalId,
176 approve: bool,
177 message: Option<String>,
178 ) -> impl Stream<Item = RespondResult> + Send + 'static {
179 let storage = self.storage.clone();
180
181 stream! {
182 match storage.resolve_approval(&approval_id, approve, message).await {
183 Ok(()) => {
184 yield RespondResult::Ok { approval_id };
185 }
186 Err(e) => {
187 yield RespondResult::Err { message: e };
188 }
189 }
190 }
191 }
192
193 #[plexus_macros::hub_method(params(
195 session_id = "Optional session ID to filter by"
196 ))]
197 async fn pending(
198 &self,
199 session_id: Option<String>,
200 ) -> impl Stream<Item = PendingResult> + Send + 'static {
201 let storage = self.storage.clone();
202
203 stream! {
204 match storage.list_pending(session_id.as_deref()).await {
205 Ok(approvals) => {
206 yield PendingResult::Ok { approvals };
207 }
208 Err(e) => {
209 yield PendingResult::Err { message: e };
210 }
211 }
212 }
213 }
214
215 #[plexus_macros::hub_method(params(
217 session_id = "Session ID for correlation"
218 ))]
219 async fn configure(
220 &self,
221 session_id: String,
222 ) -> impl Stream<Item = ConfigureResult> + Send + 'static {
223 let mcp_url = self.mcp_url.clone();
224
225 stream! {
226 let config = json!({
228 "mcpServers": {
229 "plexus": {
230 "type": "http",
231 "url": mcp_url
232 }
233 },
234 "env": {
235 "LOOPBACK_SESSION_ID": session_id
236 }
237 });
238
239 yield ConfigureResult::Ok { mcp_config: config };
240 }
241 }
242}