1use super::protocol::*;
7use async_trait::async_trait;
8use serde_json::{json, Value};
9use std::process::Stdio;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tap_mcp::mcp::protocol::ToolContent;
14use tap_mcp::tools::ToolRegistry;
15use tap_node::event::{EventSubscriber, NodeEvent};
16use tap_node::state_machine::fsm::{DecisionHandler, TransactionContext, TransactionState};
17use tap_node::storage::{DecisionStatus, DecisionType, Storage};
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::process::Command;
20use tokio::sync::{mpsc, Mutex, RwLock};
21use tracing::{debug, error, info, warn};
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum SubscribeMode {
26 Decisions,
28 All,
30}
31
32impl std::str::FromStr for SubscribeMode {
33 type Err = String;
34
35 fn from_str(s: &str) -> Result<Self, Self::Err> {
36 match s {
37 "decisions" => Ok(SubscribeMode::Decisions),
38 "all" => Ok(SubscribeMode::All),
39 _ => Err(format!(
40 "Invalid subscribe mode: {}. Expected 'decisions' or 'all'",
41 s
42 )),
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct ExternalDecisionConfig {
50 pub exec_path: String,
52 pub exec_args: Vec<String>,
54 pub subscribe_mode: SubscribeMode,
56}
57
58pub struct ExternalDecisionManager {
60 config: ExternalDecisionConfig,
61 agent_dids: Vec<String>,
62 tool_registry: Arc<ToolRegistry>,
63 storage: Arc<Storage>,
64 stdin_tx: Arc<RwLock<Option<mpsc::Sender<String>>>>,
66 is_running: AtomicBool,
68 pending_responses:
70 Arc<Mutex<std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>>>,
71 management_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
73}
74
75impl std::fmt::Debug for ExternalDecisionManager {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("ExternalDecisionManager")
78 .field("config", &self.config)
79 .field("agent_dids", &self.agent_dids)
80 .field("is_running", &self.is_running.load(Ordering::Relaxed))
81 .finish()
82 }
83}
84
85impl ExternalDecisionManager {
86 pub fn new(
88 config: ExternalDecisionConfig,
89 agent_dids: Vec<String>,
90 tool_registry: Arc<ToolRegistry>,
91 storage: Arc<Storage>,
92 ) -> Self {
93 Self {
94 config,
95 agent_dids,
96 tool_registry,
97 storage,
98 stdin_tx: Arc::new(RwLock::new(None)),
99 is_running: AtomicBool::new(false),
100 pending_responses: Arc::new(Mutex::new(std::collections::HashMap::new())),
101 management_handle: Mutex::new(None),
102 }
103 }
104
105 pub async fn start(self: &Arc<Self>) {
107 let this = Arc::clone(self);
108 let handle = tokio::spawn(async move {
109 this.run_process_loop().await;
110 });
111 *self.management_handle.lock().await = Some(handle);
112 }
113
114 pub async fn shutdown(&self) {
116 info!("Shutting down external decision manager");
117 self.is_running.store(false, Ordering::SeqCst);
118
119 {
121 let mut tx = self.stdin_tx.write().await;
122 *tx = None;
123 }
124
125 if let Some(handle) = self.management_handle.lock().await.take() {
127 handle.abort();
128 }
129 }
130
131 async fn run_process_loop(&self) {
133 let mut backoff_secs = 1u64;
134 let max_backoff = 30u64;
135
136 loop {
137 info!(
138 "Spawning external decision process: {} {:?}",
139 self.config.exec_path, self.config.exec_args
140 );
141
142 match self.spawn_and_run().await {
143 Ok(()) => {
144 info!("External decision process exited normally");
145 }
146 Err(e) => {
147 error!("External decision process error: {}", e);
148 }
149 }
150
151 self.is_running.store(false, Ordering::SeqCst);
153 {
154 let mut tx = self.stdin_tx.write().await;
155 *tx = None;
156 }
157
158 info!("Restarting external decision process in {}s", backoff_secs);
160 tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
161
162 backoff_secs = (backoff_secs * 2).min(max_backoff);
164 }
165 }
166
167 async fn spawn_and_run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
169 let mut child = Command::new(&self.config.exec_path)
170 .args(&self.config.exec_args)
171 .stdin(Stdio::piped())
172 .stdout(Stdio::piped())
173 .stderr(Stdio::inherit()) .spawn()?;
175
176 let stdin = child.stdin.take().ok_or("Failed to open stdin")?;
177 let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
178
179 let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(256);
181 {
182 let mut tx = self.stdin_tx.write().await;
183 *tx = Some(stdin_tx);
184 }
185 self.is_running.store(true, Ordering::SeqCst);
186
187 let stdin_handle = tokio::spawn(async move {
189 let mut stdin = stdin;
190 while let Some(line) = stdin_rx.recv().await {
191 if stdin.write_all(line.as_bytes()).await.is_err() {
192 break;
193 }
194 if stdin.write_all(b"\n").await.is_err() {
195 break;
196 }
197 if stdin.flush().await.is_err() {
198 break;
199 }
200 }
201 });
202
203 let tool_registry = Arc::clone(&self.tool_registry);
205 let pending_responses = Arc::clone(&self.pending_responses);
206 let storage = Arc::clone(&self.storage);
207 let stdin_tx_clone = Arc::clone(&self.stdin_tx);
208 let stdout_handle = tokio::spawn(async move {
209 let reader = BufReader::new(stdout);
210 let mut lines = reader.lines();
211
212 while let Ok(Some(line)) = lines.next_line().await {
213 if line.trim().is_empty() {
214 continue;
215 }
216
217 debug!("Received from external process: {}", line);
218
219 Self::handle_stdout_message(
220 &line,
221 &tool_registry,
222 &pending_responses,
223 &storage,
224 &stdin_tx_clone,
225 )
226 .await;
227 }
228 debug!("External process stdout closed");
229 });
230
231 self.send_initialize().await;
233
234 self.replay_pending_decisions().await;
236
237 let status = child.wait().await?;
239 info!("External decision process exited with status: {}", status);
240
241 stdin_handle.abort();
243 stdout_handle.abort();
244
245 Ok(())
246 }
247
248 async fn send_initialize(&self) {
250 let params = InitializeParams {
251 version: env!("CARGO_PKG_VERSION").to_string(),
252 agent_dids: self.agent_dids.clone(),
253 subscribe_mode: match self.config.subscribe_mode {
254 SubscribeMode::Decisions => "decisions".to_string(),
255 SubscribeMode::All => "all".to_string(),
256 },
257 capabilities: InitializeCapabilities {
258 tools: true,
259 decisions: true,
260 },
261 };
262
263 let notif = JsonRpcNotification::new(
264 "tap/initialize",
265 Some(serde_json::to_value(¶ms).unwrap()),
266 );
267
268 self.send_line(&serde_json::to_string(¬if).unwrap())
269 .await;
270 }
271
272 async fn replay_pending_decisions(&self) {
274 for did in &self.agent_dids {
276 match self
278 .storage
279 .list_decisions(Some(did), Some(DecisionStatus::Pending), None, 1000)
280 .await
281 {
282 Ok(entries) => {
283 for entry in entries {
284 self.send_decision_request(&entry).await;
285 }
286 }
287 Err(e) => {
288 error!("Failed to list pending decisions for {}: {}", did, e);
289 }
290 }
291
292 match self
294 .storage
295 .list_decisions(Some(did), Some(DecisionStatus::Delivered), None, 1000)
296 .await
297 {
298 Ok(entries) => {
299 for entry in entries {
300 self.send_decision_request(&entry).await;
301 }
302 }
303 Err(e) => {
304 error!("Failed to list delivered decisions for {}: {}", did, e);
305 }
306 }
307 }
308 }
309
310 async fn send_decision_request(&self, entry: &tap_node::storage::DecisionLogEntry) {
312 let params = DecisionRequestParams {
313 decision_id: entry.id,
314 transaction_id: entry.transaction_id.clone(),
315 agent_did: entry.agent_did.clone(),
316 decision_type: entry.decision_type.to_string(),
317 context: entry.context_json.clone(),
318 created_at: entry.created_at.clone(),
319 };
320
321 let req = JsonRpcRequest::new(
322 entry.id,
323 "tap/decision",
324 Some(serde_json::to_value(¶ms).unwrap()),
325 );
326
327 let line = serde_json::to_string(&req).unwrap();
328 self.send_line(&line).await;
329
330 if entry.status == DecisionStatus::Pending {
332 if let Err(e) = self
333 .storage
334 .update_decision_status(entry.id, DecisionStatus::Delivered, None, None)
335 .await
336 {
337 error!("Failed to mark decision {} as delivered: {}", entry.id, e);
338 }
339 }
340 }
341
342 async fn handle_stdout_message(
344 line: &str,
345 tool_registry: &ToolRegistry,
346 pending_responses: &Mutex<
347 std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>,
348 >,
349 _storage: &Storage,
350 stdin_tx: &RwLock<Option<mpsc::Sender<String>>>,
351 ) {
352 match serde_json::from_str::<IncomingMessage>(line) {
354 Ok(IncomingMessage::Request(req)) => {
355 Self::handle_tool_call(req, tool_registry, pending_responses, stdin_tx).await;
356 }
357 Ok(IncomingMessage::Notification(notif)) => {
358 debug!(
359 "Received notification from external process: {}",
360 notif.method
361 );
362 }
364 Err(_) => {
365 if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(line) {
367 let id = resp.id.as_i64().unwrap_or(-1);
368 let mut pending = pending_responses.lock().await;
369 if let Some(sender) = pending.remove(&id) {
370 let _ = sender.send(resp.result);
371 }
372 } else {
373 warn!("Unrecognized message from external process: {}", line);
374 }
375 }
376 }
377 }
378
379 async fn handle_tool_call(
381 req: JsonRpcRequest,
382 tool_registry: &ToolRegistry,
383 _pending_responses: &Mutex<
384 std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>,
385 >,
386 stdin_tx: &RwLock<Option<mpsc::Sender<String>>>,
387 ) {
388 let response = match req.method.as_str() {
389 "tools/call" => {
390 let params = req.params.unwrap_or(json!({}));
391 let tool_name = params["name"].as_str().unwrap_or("");
392 let arguments = params.get("arguments").cloned();
393
394 debug!("External process calling tool: {}", tool_name);
395
396 match tool_registry.call_tool(tool_name, arguments).await {
397 Ok(result) => JsonRpcResponse::new(
398 req.id,
399 json!({
400 "content": result.content.iter().map(|c| match c {
401 ToolContent::Text { text } => json!({"type": "text", "text": text}),
402 _ => json!({"type": "unknown"}),
403 }).collect::<Vec<_>>(),
404 "isError": result.is_error.unwrap_or(false),
405 }),
406 ),
407 Err(e) => {
408 error!("Tool call failed: {}", e);
409 JsonRpcResponse::new(
410 req.id,
411 json!({
412 "content": [{"type": "text", "text": format!("Tool call failed: {}", e)}],
413 "isError": true,
414 }),
415 )
416 }
417 }
418 }
419 "tools/list" => {
420 let tools = tool_registry.list_tools();
421 JsonRpcResponse::new(req.id, json!({ "tools": tools }))
422 }
423 _ => {
424 warn!("Unknown method from external process: {}", req.method);
425 return;
426 }
427 };
428
429 if let Ok(response_str) = serde_json::to_string(&response) {
431 let tx = stdin_tx.read().await;
432 if let Some(tx) = tx.as_ref() {
433 if let Err(e) = tx.send(response_str).await {
434 debug!("Failed to send tool response to external process: {}", e);
435 }
436 }
437 }
438 }
439
440 async fn send_line(&self, line: &str) {
442 let tx = self.stdin_tx.read().await;
443 if let Some(tx) = tx.as_ref() {
444 if let Err(e) = tx.send(line.to_string()).await {
445 debug!("Failed to send to stdin (process may be down): {}", e);
446 }
447 }
448 }
449}
450
451#[async_trait]
453impl DecisionHandler for ExternalDecisionManager {
454 async fn handle_decision(
455 &self,
456 ctx: &TransactionContext,
457 decision: &tap_node::state_machine::fsm::Decision,
458 ) {
459 let (decision_type, context_json) = match decision {
460 tap_node::state_machine::fsm::Decision::AuthorizationRequired {
461 transaction_id,
462 pending_agents,
463 } => (
464 DecisionType::AuthorizationRequired,
465 json!({
466 "transaction_state": ctx.state.to_string(),
467 "pending_agents": pending_agents,
468 "transaction_id": transaction_id,
469 }),
470 ),
471 tap_node::state_machine::fsm::Decision::PolicySatisfactionRequired {
472 transaction_id,
473 requested_by,
474 } => (
475 DecisionType::PolicySatisfactionRequired,
476 json!({
477 "transaction_state": ctx.state.to_string(),
478 "requested_by": requested_by,
479 "transaction_id": transaction_id,
480 }),
481 ),
482 tap_node::state_machine::fsm::Decision::SettlementRequired { transaction_id } => (
483 DecisionType::SettlementRequired,
484 json!({
485 "transaction_state": ctx.state.to_string(),
486 "transaction_id": transaction_id,
487 }),
488 ),
489 };
490
491 let agent_did = self.agent_dids.first().cloned().unwrap_or_default();
492
493 match self
495 .storage
496 .insert_decision(
497 &ctx.transaction_id,
498 &agent_did,
499 decision_type,
500 &context_json,
501 )
502 .await
503 {
504 Ok(decision_id) => {
505 debug!(
506 "Inserted decision {} for transaction {}",
507 decision_id, ctx.transaction_id
508 );
509
510 if self.is_running.load(Ordering::Relaxed) {
512 let entry = self.storage.get_decision_by_id(decision_id).await;
513 if let Ok(Some(entry)) = entry {
514 self.send_decision_request(&entry).await;
515 }
516 }
517 }
518 Err(e) => {
519 error!(
520 "Failed to insert decision for transaction {}: {}",
521 ctx.transaction_id, e
522 );
523 }
524 }
525 }
526}
527
528#[async_trait]
530impl EventSubscriber for ExternalDecisionManager {
531 async fn handle_event(&self, event: NodeEvent) {
532 if let NodeEvent::TransactionStateChanged {
537 ref transaction_id,
538 ref new_state,
539 ..
540 } = event
541 {
542 if let Ok(state) = new_state.parse::<TransactionState>() {
543 if state.is_terminal() {
544 if let Err(e) = self
545 .storage
546 .expire_decisions_for_transaction(transaction_id)
547 .await
548 {
549 error!(
550 "Failed to expire decisions for transaction {}: {}",
551 transaction_id, e
552 );
553 }
554 }
555 }
556 }
557
558 if self.config.subscribe_mode == SubscribeMode::All
560 && self.is_running.load(Ordering::Relaxed)
561 {
562 let (event_type, agent_did, data) = match &event {
563 NodeEvent::PlainMessageReceived { message } => {
564 ("message_received", None, message.clone())
565 }
566 NodeEvent::PlainMessageSent { message, from, to } => (
567 "message_sent",
568 Some(from.clone()),
569 json!({"message": message, "to": to}),
570 ),
571 NodeEvent::TransactionStateChanged {
572 transaction_id,
573 old_state,
574 new_state,
575 agent_did,
576 } => (
577 "transaction_state_changed",
578 agent_did.clone(),
579 json!({
580 "transaction_id": transaction_id,
581 "old_state": old_state,
582 "new_state": new_state,
583 }),
584 ),
585 NodeEvent::CustomerUpdated {
586 customer_id,
587 agent_did,
588 update_type,
589 } => (
590 "customer_updated",
591 Some(agent_did.clone()),
592 json!({
593 "customer_id": customer_id,
594 "update_type": update_type,
595 }),
596 ),
597 NodeEvent::MessageReceived { message, source } => (
598 "message_received",
599 None,
600 json!({
601 "message_id": message.id,
602 "message_type": message.type_,
603 "from": message.from,
604 "source": source,
605 }),
606 ),
607 NodeEvent::MessageSent {
608 message,
609 destination,
610 } => (
611 "message_sent",
612 None,
613 json!({
614 "message_id": message.id,
615 "message_type": message.type_,
616 "destination": destination,
617 }),
618 ),
619 _ => return, };
621
622 let params = EventNotificationParams {
623 event_type: event_type.to_string(),
624 agent_did,
625 data,
626 timestamp: chrono::Utc::now().to_rfc3339(),
627 };
628
629 let notif =
630 JsonRpcNotification::new("tap/event", Some(serde_json::to_value(¶ms).unwrap()));
631
632 self.send_line(&serde_json::to_string(¬if).unwrap())
633 .await;
634 }
635 }
636}