tuitbot_core/mutation_gateway/
mod.rs1#[cfg(test)]
16mod tests;
17
18use crate::config::{McpPolicyConfig, OperatingMode};
19use crate::error::StorageError;
20use crate::mcp_policy::types::PolicyRateLimit;
21use crate::mcp_policy::{McpPolicyEvaluator, PolicyDecision, PolicyDenialReason};
22use crate::storage::mutation_audit;
23use crate::storage::DbPool;
24
25const IDEMPOTENCY_WINDOW_SECS: u32 = 300;
27
28pub struct MutationGateway;
33
34pub struct MutationRequest<'a> {
36 pub pool: &'a DbPool,
37 pub policy_config: &'a McpPolicyConfig,
38 pub mode: &'a OperatingMode,
39 pub tool_name: &'a str,
40 pub params_json: &'a str,
41}
42
43#[derive(Debug)]
45pub enum GatewayDecision {
46 Proceed(MutationTicket),
50
51 Denied(GatewayDenial),
53
54 RoutedToApproval {
56 queue_id: i64,
57 reason: String,
58 rule_id: Option<String>,
59 },
60
61 DryRun { rule_id: Option<String> },
63
64 Duplicate(DuplicateInfo),
66}
67
68#[derive(Debug, Clone)]
70pub struct GatewayDenial {
71 pub reason: PolicyDenialReason,
72 pub rule_id: Option<String>,
73}
74
75#[derive(Debug, Clone)]
77pub struct DuplicateInfo {
78 pub original_correlation_id: String,
79 pub cached_result: Option<String>,
80 pub audit_id: i64,
81}
82
83#[derive(Debug)]
88pub struct MutationTicket {
89 pub audit_id: i64,
90 pub correlation_id: String,
91 pub tool_name: String,
92}
93
94impl MutationGateway {
95 pub async fn evaluate(req: &MutationRequest<'_>) -> Result<GatewayDecision, StorageError> {
105 let decision =
107 McpPolicyEvaluator::evaluate(req.pool, req.policy_config, req.mode, req.tool_name)
108 .await?;
109
110 let _ = McpPolicyEvaluator::log_decision(req.pool, req.tool_name, &decision).await;
112
113 match decision {
114 PolicyDecision::Deny { reason, rule_id } => {
115 return Ok(GatewayDecision::Denied(GatewayDenial { reason, rule_id }));
116 }
117 PolicyDecision::RouteToApproval { reason, rule_id } => {
118 let queue_id = crate::storage::approval_queue::enqueue_with_context(
120 req.pool,
121 req.tool_name,
122 "",
123 "",
124 req.params_json,
125 "mcp_policy",
126 req.tool_name,
127 0.0,
128 "[]",
129 Some(&reason),
130 Some(&match &rule_id {
131 Some(rid) => format!(r#"["policy_rule:{rid}"]"#),
132 None => "[]".to_string(),
133 }),
134 )
135 .await
136 .map_err(|e| StorageError::Query {
137 source: sqlx::Error::Protocol(format!("Failed to enqueue for approval: {e}")),
138 })?;
139
140 return Ok(GatewayDecision::RoutedToApproval {
141 queue_id,
142 reason,
143 rule_id,
144 });
145 }
146 PolicyDecision::DryRun { rule_id } => {
147 return Ok(GatewayDecision::DryRun { rule_id });
148 }
149 PolicyDecision::Allow => { }
150 }
151
152 let params_hash = mutation_audit::compute_params_hash(req.tool_name, req.params_json);
154 let params_summary = mutation_audit::truncate_summary(req.params_json, 500);
155
156 if let Some(existing) = mutation_audit::find_recent_duplicate(
157 req.pool,
158 req.tool_name,
159 ¶ms_hash,
160 IDEMPOTENCY_WINDOW_SECS,
161 )
162 .await?
163 {
164 let dup_corr = generate_correlation_id();
166 let dup_id = mutation_audit::insert_pending(
167 req.pool,
168 &dup_corr,
169 None,
170 req.tool_name,
171 ¶ms_hash,
172 ¶ms_summary,
173 )
174 .await?;
175 let _ =
176 mutation_audit::mark_duplicate(req.pool, dup_id, &existing.correlation_id).await;
177
178 return Ok(GatewayDecision::Duplicate(DuplicateInfo {
179 original_correlation_id: existing.correlation_id,
180 cached_result: existing.result_summary,
181 audit_id: dup_id,
182 }));
183 }
184
185 let correlation_id = generate_correlation_id();
187 let audit_id = mutation_audit::insert_pending(
188 req.pool,
189 &correlation_id,
190 None,
191 req.tool_name,
192 ¶ms_hash,
193 ¶ms_summary,
194 )
195 .await?;
196
197 Ok(GatewayDecision::Proceed(MutationTicket {
198 audit_id,
199 correlation_id,
200 tool_name: req.tool_name.to_string(),
201 }))
202 }
203
204 pub async fn complete_success(
206 pool: &DbPool,
207 ticket: &MutationTicket,
208 result_summary: &str,
209 rollback_action: Option<&str>,
210 elapsed_ms: u64,
211 rate_limit_configs: &[PolicyRateLimit],
212 ) -> Result<(), StorageError> {
213 let summary = mutation_audit::truncate_summary(result_summary, 500);
214 mutation_audit::complete_success(
215 pool,
216 ticket.audit_id,
217 &summary,
218 rollback_action,
219 elapsed_ms,
220 )
221 .await?;
222
223 McpPolicyEvaluator::record_mutation(pool, &ticket.tool_name, rate_limit_configs).await?;
224
225 Ok(())
226 }
227
228 pub async fn complete_failure(
230 pool: &DbPool,
231 ticket: &MutationTicket,
232 error_message: &str,
233 elapsed_ms: u64,
234 ) -> Result<(), StorageError> {
235 mutation_audit::complete_failure(pool, ticket.audit_id, error_message, elapsed_ms).await
236 }
237}
238
239fn generate_correlation_id() -> String {
241 use std::collections::hash_map::DefaultHasher;
242 use std::hash::{Hash, Hasher};
243 use std::sync::atomic::{AtomicU64, Ordering};
244 use std::time::SystemTime;
245
246 static COUNTER: AtomicU64 = AtomicU64::new(0);
247
248 let now = SystemTime::now()
249 .duration_since(SystemTime::UNIX_EPOCH)
250 .unwrap_or_default();
251 let nanos = now.as_nanos();
252 let count = COUNTER.fetch_add(1, Ordering::Relaxed);
253
254 let mut hasher = DefaultHasher::new();
255 nanos.hash(&mut hasher);
256 count.hash(&mut hasher);
257 std::thread::current().id().hash(&mut hasher);
258 let h1 = hasher.finish();
259 count.wrapping_add(1).hash(&mut hasher);
260 let h2 = hasher.finish();
261
262 format!(
263 "{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
264 (h1 >> 32) as u32,
265 (h1 >> 16) as u16,
266 h1 as u16 & 0x0fff,
267 (h2 >> 48) as u16 & 0x3fff | 0x8000,
268 h2 & 0xffffffffffff,
269 )
270}