Skip to main content

tuitbot_core/mutation_gateway/
mod.rs

1//! Unified mutation governance gateway.
2//!
3//! Every mutation path — MCP tools, autopilot loops, HTTP API — routes
4//! through [`MutationGateway`] before executing side effects.
5//!
6//! The gateway enforces a strict sequence:
7//! 1. **Policy evaluation** — block rules, rate limits, approval routing, dry-run
8//! 2. **Idempotency** — DB-backed dedup within a 5-minute window
9//! 3. **Audit record** — pending entry before execution
10//! 4. **Post-execution recording** — rate-limit increment + audit completion
11//!
12//! This single path replaces the scattered policy/idempotency/audit logic
13//! that was previously duplicated across MCP tool handlers.
14
15#[cfg(test)]
16mod tests;
17
18use crate::config::{McpPolicyConfig, OperatingMode};
19use crate::error::StorageError;
20use crate::mcp_policy::types::PolicyRateLimit;
21use crate::mcp_policy::{McpPolicyEvaluator, PolicyDecision, PolicyDenialReason};
22use crate::storage::mutation_audit;
23use crate::storage::DbPool;
24
25/// DB-backed idempotency window in seconds (5 minutes).
26const IDEMPOTENCY_WINDOW_SECS: u32 = 300;
27
28/// Unified mutation governance gateway.
29///
30/// Stateless: all dependencies are passed per-call via [`MutationRequest`].
31/// This allows any consumer (MCP, autopilot, HTTP server) to use the gateway.
32pub struct MutationGateway;
33
34/// Input parameters for a gateway evaluation.
35pub struct MutationRequest<'a> {
36    pub pool: &'a DbPool,
37    pub policy_config: &'a McpPolicyConfig,
38    pub mode: &'a OperatingMode,
39    pub tool_name: &'a str,
40    pub params_json: &'a str,
41}
42
43/// The gateway's decision for a mutation request.
44#[derive(Debug)]
45pub enum GatewayDecision {
46    /// The mutation may proceed. Carry the ticket through execution
47    /// and call [`MutationGateway::complete_success`] or
48    /// [`MutationGateway::complete_failure`] afterward.
49    Proceed(MutationTicket),
50
51    /// The mutation was denied by policy.
52    Denied(GatewayDenial),
53
54    /// The mutation was routed to the approval queue.
55    RoutedToApproval {
56        queue_id: i64,
57        reason: String,
58        rule_id: Option<String>,
59    },
60
61    /// Dry-run mode: the mutation would have executed but was intercepted.
62    DryRun { rule_id: Option<String> },
63
64    /// Idempotency hit: an identical recent mutation was already successful.
65    Duplicate(DuplicateInfo),
66}
67
68/// Details of a policy denial.
69#[derive(Debug, Clone)]
70pub struct GatewayDenial {
71    pub reason: PolicyDenialReason,
72    pub rule_id: Option<String>,
73}
74
75/// Information about a detected duplicate mutation.
76#[derive(Debug, Clone)]
77pub struct DuplicateInfo {
78    pub original_correlation_id: String,
79    pub cached_result: Option<String>,
80    pub audit_id: i64,
81}
82
83/// Handle for a mutation in progress. Created by the gateway on `Proceed`.
84///
85/// Carries the audit trail ID and correlation ID needed to complete
86/// the audit record after execution.
87#[derive(Debug)]
88pub struct MutationTicket {
89    pub audit_id: i64,
90    pub correlation_id: String,
91    pub tool_name: String,
92}
93
94impl MutationGateway {
95    /// Evaluate a mutation request through the full governance pipeline.
96    ///
97    /// Sequence:
98    /// 1. Policy evaluation (block rules, rate limits, approval routing)
99    /// 2. DB-backed idempotency check (5-minute window)
100    /// 3. Pending audit record creation
101    ///
102    /// Returns a [`GatewayDecision`] indicating whether the mutation may
103    /// proceed, was denied, routed to approval, or is a duplicate.
104    pub async fn evaluate(req: &MutationRequest<'_>) -> Result<GatewayDecision, StorageError> {
105        // ── Step 1: Policy evaluation ──────────────────────────────────
106        let decision =
107            McpPolicyEvaluator::evaluate(req.pool, req.policy_config, req.mode, req.tool_name)
108                .await?;
109
110        // Log the policy decision (best-effort).
111        let _ = McpPolicyEvaluator::log_decision(req.pool, req.tool_name, &decision).await;
112
113        match decision {
114            PolicyDecision::Deny { reason, rule_id } => {
115                return Ok(GatewayDecision::Denied(GatewayDenial { reason, rule_id }));
116            }
117            PolicyDecision::RouteToApproval { reason, rule_id } => {
118                // Enqueue into approval queue.
119                let queue_id = crate::storage::approval_queue::enqueue_with_context(
120                    req.pool,
121                    req.tool_name,
122                    "",
123                    "",
124                    req.params_json,
125                    "mcp_policy",
126                    req.tool_name,
127                    0.0,
128                    "[]",
129                    Some(&reason),
130                    Some(&match &rule_id {
131                        Some(rid) => format!(r#"["policy_rule:{rid}"]"#),
132                        None => "[]".to_string(),
133                    }),
134                )
135                .await
136                .map_err(|e| StorageError::Query {
137                    source: sqlx::Error::Protocol(format!("Failed to enqueue for approval: {e}")),
138                })?;
139
140                return Ok(GatewayDecision::RoutedToApproval {
141                    queue_id,
142                    reason,
143                    rule_id,
144                });
145            }
146            PolicyDecision::DryRun { rule_id } => {
147                return Ok(GatewayDecision::DryRun { rule_id });
148            }
149            PolicyDecision::Allow => { /* continue to idempotency check */ }
150        }
151
152        // ── Step 2: DB-backed idempotency (5-minute window) ────────────
153        let params_hash = mutation_audit::compute_params_hash(req.tool_name, req.params_json);
154        let params_summary = mutation_audit::truncate_summary(req.params_json, 500);
155
156        if let Some(existing) = mutation_audit::find_recent_duplicate(
157            req.pool,
158            req.tool_name,
159            &params_hash,
160            IDEMPOTENCY_WINDOW_SECS,
161        )
162        .await?
163        {
164            // Record the duplicate attempt in audit trail.
165            let dup_corr = generate_correlation_id();
166            let dup_id = mutation_audit::insert_pending(
167                req.pool,
168                &dup_corr,
169                None,
170                req.tool_name,
171                &params_hash,
172                &params_summary,
173            )
174            .await?;
175            let _ =
176                mutation_audit::mark_duplicate(req.pool, dup_id, &existing.correlation_id).await;
177
178            return Ok(GatewayDecision::Duplicate(DuplicateInfo {
179                original_correlation_id: existing.correlation_id,
180                cached_result: existing.result_summary,
181                audit_id: dup_id,
182            }));
183        }
184
185        // ── Step 3: Create pending audit record ────────────────────────
186        let correlation_id = generate_correlation_id();
187        let audit_id = mutation_audit::insert_pending(
188            req.pool,
189            &correlation_id,
190            None,
191            req.tool_name,
192            &params_hash,
193            &params_summary,
194        )
195        .await?;
196
197        Ok(GatewayDecision::Proceed(MutationTicket {
198            audit_id,
199            correlation_id,
200            tool_name: req.tool_name.to_string(),
201        }))
202    }
203
204    /// Record a successful mutation: complete audit + increment rate counters.
205    pub async fn complete_success(
206        pool: &DbPool,
207        ticket: &MutationTicket,
208        result_summary: &str,
209        rollback_action: Option<&str>,
210        elapsed_ms: u64,
211        rate_limit_configs: &[PolicyRateLimit],
212    ) -> Result<(), StorageError> {
213        let summary = mutation_audit::truncate_summary(result_summary, 500);
214        mutation_audit::complete_success(
215            pool,
216            ticket.audit_id,
217            &summary,
218            rollback_action,
219            elapsed_ms,
220        )
221        .await?;
222
223        McpPolicyEvaluator::record_mutation(pool, &ticket.tool_name, rate_limit_configs).await?;
224
225        Ok(())
226    }
227
228    /// Record a failed mutation in the audit trail.
229    pub async fn complete_failure(
230        pool: &DbPool,
231        ticket: &MutationTicket,
232        error_message: &str,
233        elapsed_ms: u64,
234    ) -> Result<(), StorageError> {
235        mutation_audit::complete_failure(pool, ticket.audit_id, error_message, elapsed_ms).await
236    }
237}
238
239/// Generate a UUID v4-like correlation ID.
240fn generate_correlation_id() -> String {
241    use std::collections::hash_map::DefaultHasher;
242    use std::hash::{Hash, Hasher};
243    use std::sync::atomic::{AtomicU64, Ordering};
244    use std::time::SystemTime;
245
246    static COUNTER: AtomicU64 = AtomicU64::new(0);
247
248    let now = SystemTime::now()
249        .duration_since(SystemTime::UNIX_EPOCH)
250        .unwrap_or_default();
251    let nanos = now.as_nanos();
252    let count = COUNTER.fetch_add(1, Ordering::Relaxed);
253
254    let mut hasher = DefaultHasher::new();
255    nanos.hash(&mut hasher);
256    count.hash(&mut hasher);
257    std::thread::current().id().hash(&mut hasher);
258    let h1 = hasher.finish();
259    count.wrapping_add(1).hash(&mut hasher);
260    let h2 = hasher.finish();
261
262    format!(
263        "{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
264        (h1 >> 32) as u32,
265        (h1 >> 16) as u16,
266        h1 as u16 & 0x0fff,
267        (h2 >> 48) as u16 & 0x3fff | 0x8000,
268        h2 & 0xffffffffffff,
269    )
270}