Skip to main content

tap_http/external_decision/
manager.rs

1//! External Decision Manager
2//!
3//! Manages the lifecycle of an external decision-making process. The child
4//! process communicates over stdin/stdout using JSON-RPC 2.0.
5
6use super::protocol::*;
7use async_trait::async_trait;
8use serde_json::{json, Value};
9use std::process::Stdio;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tap_mcp::mcp::protocol::ToolContent;
14use tap_mcp::tools::ToolRegistry;
15use tap_node::event::{EventSubscriber, NodeEvent};
16use tap_node::state_machine::fsm::{DecisionHandler, TransactionContext, TransactionState};
17use tap_node::storage::{DecisionStatus, DecisionType, Storage};
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::process::Command;
20use tokio::sync::{mpsc, Mutex, RwLock};
21use tracing::{debug, error, info, warn};
22
23/// Subscribe mode — what events to forward to the external process
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum SubscribeMode {
26    /// Only forward decision points
27    Decisions,
28    /// Forward all events + decision points
29    All,
30}
31
32impl std::str::FromStr for SubscribeMode {
33    type Err = String;
34
35    fn from_str(s: &str) -> Result<Self, Self::Err> {
36        match s {
37            "decisions" => Ok(SubscribeMode::Decisions),
38            "all" => Ok(SubscribeMode::All),
39            _ => Err(format!(
40                "Invalid subscribe mode: {}. Expected 'decisions' or 'all'",
41                s
42            )),
43        }
44    }
45}
46
47/// Configuration for the external decision executable
48#[derive(Debug, Clone)]
49pub struct ExternalDecisionConfig {
50    /// Path to the executable
51    pub exec_path: String,
52    /// Arguments to pass to the executable
53    pub exec_args: Vec<String>,
54    /// What events to forward
55    pub subscribe_mode: SubscribeMode,
56}
57
58/// Manages the external decision process lifecycle
59pub struct ExternalDecisionManager {
60    config: ExternalDecisionConfig,
61    agent_dids: Vec<String>,
62    tool_registry: Arc<ToolRegistry>,
63    storage: Arc<Storage>,
64    /// Channel for sending lines to stdin writer task
65    stdin_tx: Arc<RwLock<Option<mpsc::Sender<String>>>>,
66    /// Whether the process is currently running
67    is_running: AtomicBool,
68    /// Pending RPC responses — maps request_id to a oneshot sender
69    pending_responses:
70        Arc<Mutex<std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>>>,
71    /// Handle to the management task (for shutdown)
72    management_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
73}
74
75impl std::fmt::Debug for ExternalDecisionManager {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("ExternalDecisionManager")
78            .field("config", &self.config)
79            .field("agent_dids", &self.agent_dids)
80            .field("is_running", &self.is_running.load(Ordering::Relaxed))
81            .finish()
82    }
83}
84
85impl ExternalDecisionManager {
86    /// Create a new ExternalDecisionManager
87    pub fn new(
88        config: ExternalDecisionConfig,
89        agent_dids: Vec<String>,
90        tool_registry: Arc<ToolRegistry>,
91        storage: Arc<Storage>,
92    ) -> Self {
93        Self {
94            config,
95            agent_dids,
96            tool_registry,
97            storage,
98            stdin_tx: Arc::new(RwLock::new(None)),
99            is_running: AtomicBool::new(false),
100            pending_responses: Arc::new(Mutex::new(std::collections::HashMap::new())),
101            management_handle: Mutex::new(None),
102        }
103    }
104
105    /// Start the external process and management tasks
106    pub async fn start(self: &Arc<Self>) {
107        let this = Arc::clone(self);
108        let handle = tokio::spawn(async move {
109            this.run_process_loop().await;
110        });
111        *self.management_handle.lock().await = Some(handle);
112    }
113
114    /// Graceful shutdown
115    pub async fn shutdown(&self) {
116        info!("Shutting down external decision manager");
117        self.is_running.store(false, Ordering::SeqCst);
118
119        // Close stdin to signal the child
120        {
121            let mut tx = self.stdin_tx.write().await;
122            *tx = None;
123        }
124
125        // Cancel management task
126        if let Some(handle) = self.management_handle.lock().await.take() {
127            handle.abort();
128        }
129    }
130
131    /// Main process lifecycle loop with restart and backoff
132    async fn run_process_loop(&self) {
133        let mut backoff_secs = 1u64;
134        let max_backoff = 30u64;
135
136        loop {
137            info!(
138                "Spawning external decision process: {} {:?}",
139                self.config.exec_path, self.config.exec_args
140            );
141
142            match self.spawn_and_run().await {
143                Ok(()) => {
144                    info!("External decision process exited normally");
145                }
146                Err(e) => {
147                    error!("External decision process error: {}", e);
148                }
149            }
150
151            // Clear running state
152            self.is_running.store(false, Ordering::SeqCst);
153            {
154                let mut tx = self.stdin_tx.write().await;
155                *tx = None;
156            }
157
158            // Backoff before restart
159            info!("Restarting external decision process in {}s", backoff_secs);
160            tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
161
162            // Increase backoff (cap at max)
163            backoff_secs = (backoff_secs * 2).min(max_backoff);
164        }
165    }
166
167    /// Spawn the process and run until it exits
168    async fn spawn_and_run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
169        let mut child = Command::new(&self.config.exec_path)
170            .args(&self.config.exec_args)
171            .stdin(Stdio::piped())
172            .stdout(Stdio::piped())
173            .stderr(Stdio::inherit()) // Forward stderr to tap-http log output
174            .spawn()?;
175
176        let stdin = child.stdin.take().ok_or("Failed to open stdin")?;
177        let stdout = child.stdout.take().ok_or("Failed to open stdout")?;
178
179        // Create stdin writer channel
180        let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(256);
181        {
182            let mut tx = self.stdin_tx.write().await;
183            *tx = Some(stdin_tx);
184        }
185        self.is_running.store(true, Ordering::SeqCst);
186
187        // Stdin writer task
188        let stdin_handle = tokio::spawn(async move {
189            let mut stdin = stdin;
190            while let Some(line) = stdin_rx.recv().await {
191                if stdin.write_all(line.as_bytes()).await.is_err() {
192                    break;
193                }
194                if stdin.write_all(b"\n").await.is_err() {
195                    break;
196                }
197                if stdin.flush().await.is_err() {
198                    break;
199                }
200            }
201        });
202
203        // Stdout reader task
204        let tool_registry = Arc::clone(&self.tool_registry);
205        let pending_responses = Arc::clone(&self.pending_responses);
206        let storage = Arc::clone(&self.storage);
207        let stdin_tx_clone = Arc::clone(&self.stdin_tx);
208        let stdout_handle = tokio::spawn(async move {
209            let reader = BufReader::new(stdout);
210            let mut lines = reader.lines();
211
212            while let Ok(Some(line)) = lines.next_line().await {
213                if line.trim().is_empty() {
214                    continue;
215                }
216
217                debug!("Received from external process: {}", line);
218
219                Self::handle_stdout_message(
220                    &line,
221                    &tool_registry,
222                    &pending_responses,
223                    &storage,
224                    &stdin_tx_clone,
225                )
226                .await;
227            }
228            debug!("External process stdout closed");
229        });
230
231        // Send initialization
232        self.send_initialize().await;
233
234        // Replay pending decisions
235        self.replay_pending_decisions().await;
236
237        // Wait for child to exit
238        let status = child.wait().await?;
239        info!("External decision process exited with status: {}", status);
240
241        // Clean up tasks
242        stdin_handle.abort();
243        stdout_handle.abort();
244
245        Ok(())
246    }
247
248    /// Send the initialization message
249    async fn send_initialize(&self) {
250        let params = InitializeParams {
251            version: env!("CARGO_PKG_VERSION").to_string(),
252            agent_dids: self.agent_dids.clone(),
253            subscribe_mode: match self.config.subscribe_mode {
254                SubscribeMode::Decisions => "decisions".to_string(),
255                SubscribeMode::All => "all".to_string(),
256            },
257            capabilities: InitializeCapabilities {
258                tools: true,
259                decisions: true,
260            },
261        };
262
263        let notif = JsonRpcNotification::new(
264            "tap/initialize",
265            Some(serde_json::to_value(&params).unwrap()),
266        );
267
268        self.send_line(&serde_json::to_string(&notif).unwrap())
269            .await;
270    }
271
272    /// Replay all pending/delivered decisions from the decision log
273    async fn replay_pending_decisions(&self) {
274        // Get all pending/delivered decisions across agents
275        for did in &self.agent_dids {
276            // List pending
277            match self
278                .storage
279                .list_decisions(Some(did), Some(DecisionStatus::Pending), None, 1000)
280                .await
281            {
282                Ok(entries) => {
283                    for entry in entries {
284                        self.send_decision_request(&entry).await;
285                    }
286                }
287                Err(e) => {
288                    error!("Failed to list pending decisions for {}: {}", did, e);
289                }
290            }
291
292            // List delivered (re-send in case external process lost them)
293            match self
294                .storage
295                .list_decisions(Some(did), Some(DecisionStatus::Delivered), None, 1000)
296                .await
297            {
298                Ok(entries) => {
299                    for entry in entries {
300                        self.send_decision_request(&entry).await;
301                    }
302                }
303                Err(e) => {
304                    error!("Failed to list delivered decisions for {}: {}", did, e);
305                }
306            }
307        }
308    }
309
310    /// Send a decision request for a DecisionLogEntry
311    async fn send_decision_request(&self, entry: &tap_node::storage::DecisionLogEntry) {
312        let params = DecisionRequestParams {
313            decision_id: entry.id,
314            transaction_id: entry.transaction_id.clone(),
315            agent_did: entry.agent_did.clone(),
316            decision_type: entry.decision_type.to_string(),
317            context: entry.context_json.clone(),
318            created_at: entry.created_at.clone(),
319        };
320
321        let req = JsonRpcRequest::new(
322            entry.id,
323            "tap/decision",
324            Some(serde_json::to_value(&params).unwrap()),
325        );
326
327        let line = serde_json::to_string(&req).unwrap();
328        self.send_line(&line).await;
329
330        // Mark as delivered
331        if entry.status == DecisionStatus::Pending {
332            if let Err(e) = self
333                .storage
334                .update_decision_status(entry.id, DecisionStatus::Delivered, None, None)
335                .await
336            {
337                error!("Failed to mark decision {} as delivered: {}", entry.id, e);
338            }
339        }
340    }
341
342    /// Handle a line from stdout
343    async fn handle_stdout_message(
344        line: &str,
345        tool_registry: &ToolRegistry,
346        pending_responses: &Mutex<
347            std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>,
348        >,
349        _storage: &Storage,
350        stdin_tx: &RwLock<Option<mpsc::Sender<String>>>,
351    ) {
352        // Try to parse as incoming message
353        match serde_json::from_str::<IncomingMessage>(line) {
354            Ok(IncomingMessage::Request(req)) => {
355                Self::handle_tool_call(req, tool_registry, pending_responses, stdin_tx).await;
356            }
357            Ok(IncomingMessage::Notification(notif)) => {
358                debug!(
359                    "Received notification from external process: {}",
360                    notif.method
361                );
362                // Handle tap/ready or other notifications
363            }
364            Err(_) => {
365                // Try as a JSON-RPC response (from decision requests)
366                if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(line) {
367                    let id = resp.id.as_i64().unwrap_or(-1);
368                    let mut pending = pending_responses.lock().await;
369                    if let Some(sender) = pending.remove(&id) {
370                        let _ = sender.send(resp.result);
371                    }
372                } else {
373                    warn!("Unrecognized message from external process: {}", line);
374                }
375            }
376        }
377    }
378
379    /// Handle a tool call from the external process and send response back via stdin
380    async fn handle_tool_call(
381        req: JsonRpcRequest,
382        tool_registry: &ToolRegistry,
383        _pending_responses: &Mutex<
384            std::collections::HashMap<i64, tokio::sync::oneshot::Sender<Value>>,
385        >,
386        stdin_tx: &RwLock<Option<mpsc::Sender<String>>>,
387    ) {
388        let response = match req.method.as_str() {
389            "tools/call" => {
390                let params = req.params.unwrap_or(json!({}));
391                let tool_name = params["name"].as_str().unwrap_or("");
392                let arguments = params.get("arguments").cloned();
393
394                debug!("External process calling tool: {}", tool_name);
395
396                match tool_registry.call_tool(tool_name, arguments).await {
397                    Ok(result) => JsonRpcResponse::new(
398                        req.id,
399                        json!({
400                            "content": result.content.iter().map(|c| match c {
401                                ToolContent::Text { text } => json!({"type": "text", "text": text}),
402                                _ => json!({"type": "unknown"}),
403                            }).collect::<Vec<_>>(),
404                            "isError": result.is_error.unwrap_or(false),
405                        }),
406                    ),
407                    Err(e) => {
408                        error!("Tool call failed: {}", e);
409                        JsonRpcResponse::new(
410                            req.id,
411                            json!({
412                                "content": [{"type": "text", "text": format!("Tool call failed: {}", e)}],
413                                "isError": true,
414                            }),
415                        )
416                    }
417                }
418            }
419            "tools/list" => {
420                let tools = tool_registry.list_tools();
421                JsonRpcResponse::new(req.id, json!({ "tools": tools }))
422            }
423            _ => {
424                warn!("Unknown method from external process: {}", req.method);
425                return;
426            }
427        };
428
429        // Send response back to external process via stdin
430        if let Ok(response_str) = serde_json::to_string(&response) {
431            let tx = stdin_tx.read().await;
432            if let Some(tx) = tx.as_ref() {
433                if let Err(e) = tx.send(response_str).await {
434                    debug!("Failed to send tool response to external process: {}", e);
435                }
436            }
437        }
438    }
439
440    /// Send a line to stdin
441    async fn send_line(&self, line: &str) {
442        let tx = self.stdin_tx.read().await;
443        if let Some(tx) = tx.as_ref() {
444            if let Err(e) = tx.send(line.to_string()).await {
445                debug!("Failed to send to stdin (process may be down): {}", e);
446            }
447        }
448    }
449}
450
451// Implement DecisionHandler so the FSM can delegate decisions to us
452#[async_trait]
453impl DecisionHandler for ExternalDecisionManager {
454    async fn handle_decision(
455        &self,
456        ctx: &TransactionContext,
457        decision: &tap_node::state_machine::fsm::Decision,
458    ) {
459        let (decision_type, context_json) = match decision {
460            tap_node::state_machine::fsm::Decision::AuthorizationRequired {
461                transaction_id,
462                pending_agents,
463            } => (
464                DecisionType::AuthorizationRequired,
465                json!({
466                    "transaction_state": ctx.state.to_string(),
467                    "pending_agents": pending_agents,
468                    "transaction_id": transaction_id,
469                }),
470            ),
471            tap_node::state_machine::fsm::Decision::PolicySatisfactionRequired {
472                transaction_id,
473                requested_by,
474            } => (
475                DecisionType::PolicySatisfactionRequired,
476                json!({
477                    "transaction_state": ctx.state.to_string(),
478                    "requested_by": requested_by,
479                    "transaction_id": transaction_id,
480                }),
481            ),
482            tap_node::state_machine::fsm::Decision::SettlementRequired { transaction_id } => (
483                DecisionType::SettlementRequired,
484                json!({
485                    "transaction_state": ctx.state.to_string(),
486                    "transaction_id": transaction_id,
487                }),
488            ),
489        };
490
491        let agent_did = self.agent_dids.first().cloned().unwrap_or_default();
492
493        // Insert into decision log
494        match self
495            .storage
496            .insert_decision(
497                &ctx.transaction_id,
498                &agent_did,
499                decision_type,
500                &context_json,
501            )
502            .await
503        {
504            Ok(decision_id) => {
505                debug!(
506                    "Inserted decision {} for transaction {}",
507                    decision_id, ctx.transaction_id
508                );
509
510                // If process is running, send immediately
511                if self.is_running.load(Ordering::Relaxed) {
512                    let entry = self.storage.get_decision_by_id(decision_id).await;
513                    if let Ok(Some(entry)) = entry {
514                        self.send_decision_request(&entry).await;
515                    }
516                }
517            }
518            Err(e) => {
519                error!(
520                    "Failed to insert decision for transaction {}: {}",
521                    ctx.transaction_id, e
522                );
523            }
524        }
525    }
526}
527
528// Implement EventSubscriber so we can forward events to the external process
529#[async_trait]
530impl EventSubscriber for ExternalDecisionManager {
531    async fn handle_event(&self, event: NodeEvent) {
532        // In "decisions" mode, we only handle TransactionStateChanged for expiration
533        // In "all" mode, we also forward all events
534
535        // Always handle terminal state transitions for expiration
536        if let NodeEvent::TransactionStateChanged {
537            ref transaction_id,
538            ref new_state,
539            ..
540        } = event
541        {
542            if let Ok(state) = new_state.parse::<TransactionState>() {
543                if state.is_terminal() {
544                    if let Err(e) = self
545                        .storage
546                        .expire_decisions_for_transaction(transaction_id)
547                        .await
548                    {
549                        error!(
550                            "Failed to expire decisions for transaction {}: {}",
551                            transaction_id, e
552                        );
553                    }
554                }
555            }
556        }
557
558        // In "all" mode, forward all events
559        if self.config.subscribe_mode == SubscribeMode::All
560            && self.is_running.load(Ordering::Relaxed)
561        {
562            let (event_type, agent_did, data) = match &event {
563                NodeEvent::PlainMessageReceived { message } => {
564                    ("message_received", None, message.clone())
565                }
566                NodeEvent::PlainMessageSent { message, from, to } => (
567                    "message_sent",
568                    Some(from.clone()),
569                    json!({"message": message, "to": to}),
570                ),
571                NodeEvent::TransactionStateChanged {
572                    transaction_id,
573                    old_state,
574                    new_state,
575                    agent_did,
576                } => (
577                    "transaction_state_changed",
578                    agent_did.clone(),
579                    json!({
580                        "transaction_id": transaction_id,
581                        "old_state": old_state,
582                        "new_state": new_state,
583                    }),
584                ),
585                NodeEvent::CustomerUpdated {
586                    customer_id,
587                    agent_did,
588                    update_type,
589                } => (
590                    "customer_updated",
591                    Some(agent_did.clone()),
592                    json!({
593                        "customer_id": customer_id,
594                        "update_type": update_type,
595                    }),
596                ),
597                NodeEvent::MessageReceived { message, source } => (
598                    "message_received",
599                    None,
600                    json!({
601                        "message_id": message.id,
602                        "message_type": message.type_,
603                        "from": message.from,
604                        "source": source,
605                    }),
606                ),
607                NodeEvent::MessageSent {
608                    message,
609                    destination,
610                } => (
611                    "message_sent",
612                    None,
613                    json!({
614                        "message_id": message.id,
615                        "message_type": message.type_,
616                        "destination": destination,
617                    }),
618                ),
619                _ => return, // Skip events we don't forward
620            };
621
622            let params = EventNotificationParams {
623                event_type: event_type.to_string(),
624                agent_did,
625                data,
626                timestamp: chrono::Utc::now().to_rfc3339(),
627            };
628
629            let notif =
630                JsonRpcNotification::new("tap/event", Some(serde_json::to_value(&params).unwrap()));
631
632            self.send_line(&serde_json::to_string(&notif).unwrap())
633                .await;
634        }
635    }
636}