Skip to main content

meerkat_mobkit/runtime/
gating.rs

1//! Gating subsystem — policy evaluation, audit logging, and module-backed decisions.
2
3use super::module_boundary::{
4    CORE_MODULE_MCP_TIMEOUT, MEMORY_CONFLICT_READ_MCP_TOOL, call_module_mcp_tool_json,
5    mcp_required_error, module_uses_mcp,
6};
7use super::*;
8
9impl MobkitRuntimeHandle {
10    fn next_gating_sequence(&mut self) -> u64 {
11        Self::next_sequence(&mut self.gating_sequence)
12    }
13    fn append_gating_audit(&mut self, mut entry: GatingAuditEntry) {
14        let audit_sequence = self.next_gating_sequence();
15        entry.audit_id = format!("gate-audit-{audit_sequence:06}");
16        entry.timestamp_ms = current_time_ms();
17        self.gating_audit.push(entry);
18        while self.gating_audit.len() > GATING_AUDIT_MAX_RETAINED {
19            self.gating_audit.remove(0);
20        }
21    }
22    fn refresh_gating_timeouts(&mut self) {
23        let now_ms = current_time_ms();
24        let expired = self
25            .gating_pending
26            .iter()
27            .filter(|(_, entry)| now_ms >= entry.deadline_at_ms)
28            .map(|(pending_id, _)| pending_id.clone())
29            .collect::<Vec<_>>();
30        for pending_id in expired {
31            if let Some(expired_entry) = self.gating_pending.remove(&pending_id) {
32                self.gating_pending_order
33                    .retain(|candidate| candidate != &pending_id);
34                self.append_gating_audit(GatingAuditEntry {
35                    audit_id: String::new(),
36                    timestamp_ms: 0,
37                    event_type: "timeout_fallback".to_string(),
38                    action_id: expired_entry.action_id.clone(),
39                    pending_id: Some(pending_id),
40                    actor_id: expired_entry.actor_id,
41                    risk_tier: expired_entry.risk_tier,
42                    outcome: GatingOutcome::SafeDraft,
43                    detail: serde_json::json!({
44                        "fallback": "safe_draft",
45                        "reason": "approval_timeout"
46                    }),
47                });
48            }
49        }
50    }
51    fn upsert_gating_pending_entry(&mut self, entry: GatingPendingEntry) {
52        let pending_id = entry.pending_id.clone();
53        self.gating_pending.insert(pending_id.clone(), entry);
54        self.gating_pending_order
55            .retain(|candidate| candidate != &pending_id);
56        self.gating_pending_order.push(pending_id);
57        while self.gating_pending_order.len() > GATING_PENDING_MAX_RETAINED {
58            let oldest = self.gating_pending_order.remove(0);
59            self.gating_pending.remove(&oldest);
60        }
61    }
62
63    fn parse_memory_conflict_mcp_response(
64        response: Value,
65    ) -> Result<Option<MemoryConflictSignal>, RuntimeBoundaryError> {
66        let candidate = response
67            .as_object()
68            .and_then(|payload| payload.get("conflict"))
69            .cloned()
70            .unwrap_or(response);
71        if candidate.is_null() {
72            return Ok(None);
73        }
74        serde_json::from_value::<MemoryConflictSignal>(candidate)
75            .map(Some)
76            .map_err(|error| {
77                RuntimeBoundaryError::Mcp(McpBoundaryError::InvalidToolPayload {
78                    module_id: "memory".to_string(),
79                    tool: MEMORY_CONFLICT_READ_MCP_TOOL.to_string(),
80                    reason: error.to_string(),
81                })
82            })
83    }
84
85    fn gating_memory_conflict_for_reference(
86        &self,
87        entity: Option<&str>,
88        topic: Option<&str>,
89    ) -> Result<Option<MemoryConflictSignal>, RuntimeBoundaryError> {
90        if !self.is_module_loaded("memory") {
91            return Ok(self.memory_conflict_for_reference(entity, topic));
92        }
93
94        let Some((memory_module, pre_spawn)) = self.module_and_prespawn("memory") else {
95            return Err(mcp_required_error("memory", MEMORY_CONFLICT_READ_MCP_TOOL));
96        };
97        if !module_uses_mcp(memory_module, pre_spawn) {
98            return Err(mcp_required_error("memory", MEMORY_CONFLICT_READ_MCP_TOOL));
99        }
100
101        let response = call_module_mcp_tool_json(
102            memory_module,
103            pre_spawn,
104            MEMORY_CONFLICT_READ_MCP_TOOL,
105            &serde_json::json!({
106                "entity": entity,
107                "topic": topic,
108            }),
109            CORE_MODULE_MCP_TIMEOUT,
110        )?;
111        Self::parse_memory_conflict_mcp_response(response)
112    }
113
114    pub fn evaluate_gating_action(
115        &mut self,
116        request: GatingEvaluateRequest,
117    ) -> GatingEvaluateResult {
118        self.refresh_gating_timeouts();
119        let action = request.action.trim().to_string();
120        let actor_id = request.actor_id.trim().to_string();
121        let requested_approver = request
122            .requested_approver
123            .as_deref()
124            .map(str::trim)
125            .filter(|value| !value.is_empty())
126            .map(ToString::to_string);
127        let approval_recipient = request
128            .approval_recipient
129            .as_deref()
130            .map(str::trim)
131            .filter(|value| !value.is_empty())
132            .map(ToString::to_string);
133        let approval_channel = request
134            .approval_channel
135            .as_deref()
136            .map(str::trim)
137            .filter(|value| !value.is_empty())
138            .map(ToString::to_string);
139        let entity = request
140            .entity
141            .as_deref()
142            .map(str::trim)
143            .filter(|value| !value.is_empty())
144            .map(ToString::to_string);
145        let topic = request
146            .topic
147            .as_deref()
148            .map(str::trim)
149            .filter(|value| !value.is_empty())
150            .map(ToString::to_string);
151        let action_sequence = self.next_gating_sequence();
152        let action_id = format!("gate-action-{action_sequence:06}");
153        let risk_tier = request.risk_tier.clone();
154
155        if matches!(request.risk_tier, GatingRiskTier::R2 | GatingRiskTier::R3) {
156            if !self.memory_conflicts.is_empty() && (entity.is_none() || topic.is_none()) {
157                self.append_gating_audit(GatingAuditEntry {
158                    audit_id: String::new(),
159                    timestamp_ms: 0,
160                    event_type: "conflict_blocked".to_string(),
161                    action_id: action_id.clone(),
162                    pending_id: None,
163                    actor_id: actor_id.clone(),
164                    risk_tier: risk_tier.clone(),
165                    outcome: GatingOutcome::SafeDraft,
166                    detail: serde_json::json!({
167                        "policy": "memory_conflict_context_required_v0_1",
168                        "reason": "memory_conflict_context_missing",
169                        "action": action,
170                        "reference": {
171                            "entity": entity,
172                            "topic": topic,
173                        },
174                        "missing_context": {
175                            "entity": entity.is_none(),
176                            "topic": topic.is_none(),
177                        },
178                        "conflict_count": self.memory_conflicts.len(),
179                    }),
180                });
181                return GatingEvaluateResult {
182                    action_id,
183                    action,
184                    actor_id,
185                    risk_tier,
186                    outcome: GatingOutcome::SafeDraft,
187                    pending_id: None,
188                    fallback_reason: Some("memory_conflict_context_missing".to_string()),
189                };
190            }
191            let conflict = match self
192                .gating_memory_conflict_for_reference(entity.as_deref(), topic.as_deref())
193            {
194                Ok(conflict) => conflict,
195                Err(error) => {
196                    self.append_gating_audit(GatingAuditEntry {
197                        audit_id: String::new(),
198                        timestamp_ms: 0,
199                        event_type: "memory_conflict_lookup_failed".to_string(),
200                        action_id: action_id.clone(),
201                        pending_id: None,
202                        actor_id: actor_id.clone(),
203                        risk_tier: risk_tier.clone(),
204                        outcome: GatingOutcome::SafeDraft,
205                        detail: serde_json::json!({
206                            "policy": "memory_conflict_lookup_via_core_mcp",
207                            "reason": "memory_conflict_lookup_failed",
208                            "error": format!("{error:?}"),
209                            "reference": {
210                                "entity": entity,
211                                "topic": topic,
212                            },
213                        }),
214                    });
215                    return GatingEvaluateResult {
216                        action_id,
217                        action,
218                        actor_id,
219                        risk_tier,
220                        outcome: GatingOutcome::SafeDraft,
221                        pending_id: None,
222                        fallback_reason: Some("memory_conflict_lookup_failed".to_string()),
223                    };
224                }
225            };
226            if let Some(conflict) = conflict {
227                self.append_gating_audit(GatingAuditEntry {
228                    audit_id: String::new(),
229                    timestamp_ms: 0,
230                    event_type: "conflict_blocked".to_string(),
231                    action_id: action_id.clone(),
232                    pending_id: None,
233                    actor_id: actor_id.clone(),
234                    risk_tier: risk_tier.clone(),
235                    outcome: GatingOutcome::SafeDraft,
236                    detail: serde_json::json!({
237                        "policy": "memory_conflict_block_v0_1",
238                        "reason": "memory_conflict",
239                        "action": action,
240                        "reference": {
241                            "entity": entity,
242                            "topic": topic,
243                        },
244                        "conflict": conflict,
245                    }),
246                });
247                return GatingEvaluateResult {
248                    action_id,
249                    action,
250                    actor_id,
251                    risk_tier,
252                    outcome: GatingOutcome::SafeDraft,
253                    pending_id: None,
254                    fallback_reason: Some("memory_conflict".to_string()),
255                };
256            }
257        }
258
259        match request.risk_tier {
260            GatingRiskTier::R0 | GatingRiskTier::R1 => {
261                self.append_gating_audit(GatingAuditEntry {
262                    audit_id: String::new(),
263                    timestamp_ms: 0,
264                    event_type: "evaluated".to_string(),
265                    action_id: action_id.clone(),
266                    pending_id: None,
267                    actor_id: actor_id.clone(),
268                    risk_tier: risk_tier.clone(),
269                    outcome: GatingOutcome::Allowed,
270                    detail: serde_json::json!({
271                        "policy": "allow_immediate",
272                        "rationale": request.rationale,
273                        "action": action,
274                    }),
275                });
276                GatingEvaluateResult {
277                    action_id,
278                    action,
279                    actor_id,
280                    risk_tier,
281                    outcome: GatingOutcome::Allowed,
282                    pending_id: None,
283                    fallback_reason: None,
284                }
285            }
286            GatingRiskTier::R2 => {
287                self.append_gating_audit(GatingAuditEntry {
288                    audit_id: String::new(),
289                    timestamp_ms: 0,
290                    event_type: "evaluated".to_string(),
291                    action_id: action_id.clone(),
292                    pending_id: None,
293                    actor_id: actor_id.clone(),
294                    risk_tier: risk_tier.clone(),
295                    outcome: GatingOutcome::AllowedWithAudit,
296                    detail: serde_json::json!({
297                        "policy": "consequence_mode_allow_with_audit_v0_1",
298                        "rationale": request.rationale,
299                        "action": action,
300                    }),
301                });
302                GatingEvaluateResult {
303                    action_id,
304                    action,
305                    actor_id,
306                    risk_tier,
307                    outcome: GatingOutcome::AllowedWithAudit,
308                    pending_id: None,
309                    fallback_reason: None,
310                }
311            }
312            GatingRiskTier::R3 => {
313                let pending_sequence = self.next_gating_sequence();
314                let pending_id = format!("gate-pending-{pending_sequence:06}");
315                let created_at_ms = current_time_ms();
316                let timeout_ms = request
317                    .approval_timeout_ms
318                    .unwrap_or(GATING_APPROVAL_TIMEOUT_DEFAULT_MS);
319                let mut approval_route_id = None;
320                let mut approval_delivery_id = None;
321                let mut approval_notification_error = None;
322
323                if let (Some(recipient), Some(channel)) =
324                    (approval_recipient.as_ref(), approval_channel.as_ref())
325                {
326                    if self.is_module_loaded("router") && self.is_module_loaded("delivery") {
327                        match self.resolve_routing(RoutingResolveRequest {
328                            recipient: recipient.clone(),
329                            channel: Some(channel.clone()),
330                            retry_max: None,
331                            backoff_ms: None,
332                            rate_limit_per_minute: None,
333                        }) {
334                            Ok(resolution) => {
335                                approval_route_id = Some(resolution.route_id.clone());
336                                match self.send_delivery(DeliverySendRequest {
337                                    resolution,
338                                    payload: serde_json::json!({
339                                        "kind": "gating_approval_request",
340                                        "pending_id": pending_id,
341                                        "action_id": action_id,
342                                        "action": action,
343                                        "actor_id": actor_id,
344                                        "risk_tier": risk_tier,
345                                        "requested_approver": requested_approver,
346                                        "deadline_at_ms": created_at_ms.saturating_add(timeout_ms),
347                                    }),
348                                    idempotency_key: Some(format!("gating-approval-{pending_id}")),
349                                }) {
350                                    Ok(record) => {
351                                        if record.status == "sent" {
352                                            approval_delivery_id = Some(record.delivery_id);
353                                        } else {
354                                            approval_notification_error = Some(format!(
355                                                "delivery_status:{}:{}",
356                                                record.status, record.delivery_id
357                                            ));
358                                        }
359                                    }
360                                    Err(err) => {
361                                        approval_notification_error =
362                                            Some(format!("delivery:{err:?}"));
363                                    }
364                                }
365                            }
366                            Err(err) => {
367                                approval_notification_error = Some(format!("routing:{err:?}"));
368                            }
369                        }
370                    } else {
371                        let mut missing_modules = Vec::new();
372                        if !self.is_module_loaded("router") {
373                            missing_modules.push("router");
374                        }
375                        if !self.is_module_loaded("delivery") {
376                            missing_modules.push("delivery");
377                        }
378                        approval_notification_error = Some(format!(
379                            "notification_modules_unavailable:{}",
380                            missing_modules.join(",")
381                        ));
382                    }
383                }
384                let pending_entry = GatingPendingEntry {
385                    pending_id: pending_id.clone(),
386                    action_id: action_id.clone(),
387                    action: action.clone(),
388                    actor_id: actor_id.clone(),
389                    risk_tier: risk_tier.clone(),
390                    requested_approver,
391                    approval_recipient,
392                    approval_channel,
393                    approval_route_id,
394                    approval_delivery_id,
395                    created_at_ms,
396                    deadline_at_ms: created_at_ms.saturating_add(timeout_ms),
397                };
398                self.upsert_gating_pending_entry(pending_entry.clone());
399                self.append_gating_audit(GatingAuditEntry {
400                    audit_id: String::new(),
401                    timestamp_ms: 0,
402                    event_type: "pending_created".to_string(),
403                    action_id: action_id.clone(),
404                    pending_id: Some(pending_id.clone()),
405                    actor_id: actor_id.clone(),
406                    risk_tier: risk_tier.clone(),
407                    outcome: GatingOutcome::PendingApproval,
408                    detail: serde_json::json!({
409                        "requested_approver": pending_entry.requested_approver,
410                        "approval_recipient": pending_entry.approval_recipient,
411                        "approval_channel": pending_entry.approval_channel,
412                        "approval_route_id": pending_entry.approval_route_id,
413                        "approval_delivery_id": pending_entry.approval_delivery_id,
414                        "approval_notification_error": approval_notification_error,
415                        "deadline_at_ms": pending_entry.deadline_at_ms,
416                        "action": action,
417                    }),
418                });
419                GatingEvaluateResult {
420                    action_id,
421                    action,
422                    actor_id,
423                    risk_tier,
424                    outcome: GatingOutcome::PendingApproval,
425                    pending_id: Some(pending_id),
426                    fallback_reason: None,
427                }
428            }
429        }
430    }
431
432    pub fn list_gating_pending(&mut self) -> Vec<GatingPendingEntry> {
433        self.refresh_gating_timeouts();
434        self.gating_pending_order
435            .iter()
436            .filter_map(|pending_id| self.gating_pending.get(pending_id).cloned())
437            .collect()
438    }
439
440    pub fn decide_gating_action(
441        &mut self,
442        request: GatingDecideRequest,
443    ) -> Result<GatingDecisionResult, GatingDecideError> {
444        self.refresh_gating_timeouts();
445        let decision = request.decision.clone();
446        let reason = request.reason.clone();
447        let pending_id = request.pending_id.trim().to_string();
448        let approver_id = request.approver_id.trim().to_string();
449        let pending_entry = self
450            .gating_pending
451            .remove(&pending_id)
452            .ok_or_else(|| GatingDecideError::UnknownPendingId(pending_id.clone()))?;
453        self.gating_pending_order
454            .retain(|candidate| candidate != &pending_id);
455
456        if matches!(decision, GatingDecision::Approve) && approver_id == pending_entry.actor_id {
457            self.upsert_gating_pending_entry(pending_entry);
458            return Err(GatingDecideError::SelfApprovalForbidden);
459        }
460        if let Some(expected_approver) = pending_entry.requested_approver.as_deref()
461            && expected_approver != approver_id
462        {
463            let expected = expected_approver.to_string();
464            self.upsert_gating_pending_entry(pending_entry);
465            return Err(GatingDecideError::ApproverMismatch {
466                expected,
467                provided: approver_id,
468            });
469        }
470
471        let mut next_pending_id = None;
472        let (outcome, event_type) = match decision {
473            GatingDecision::Approve => (GatingOutcome::Allowed, "approval_decided"),
474            GatingDecision::Reject => (GatingOutcome::SafeDraft, "rejection_decided"),
475            GatingDecision::Escalate => {
476                let successor_sequence = self.next_gating_sequence();
477                let successor_pending_id = format!("gate-pending-{successor_sequence:06}");
478                let successor_entry = GatingPendingEntry {
479                    pending_id: successor_pending_id.clone(),
480                    action_id: pending_entry.action_id.clone(),
481                    action: pending_entry.action.clone(),
482                    actor_id: pending_entry.actor_id.clone(),
483                    risk_tier: pending_entry.risk_tier.clone(),
484                    requested_approver: None,
485                    approval_recipient: pending_entry.approval_recipient.clone(),
486                    approval_channel: pending_entry.approval_channel.clone(),
487                    approval_route_id: None,
488                    approval_delivery_id: None,
489                    created_at_ms: current_time_ms(),
490                    deadline_at_ms: pending_entry.deadline_at_ms,
491                };
492                self.upsert_gating_pending_entry(successor_entry.clone());
493                next_pending_id = Some(successor_pending_id.clone());
494                self.append_gating_audit(GatingAuditEntry {
495                    audit_id: String::new(),
496                    timestamp_ms: 0,
497                    event_type: "pending_created".to_string(),
498                    action_id: successor_entry.action_id.clone(),
499                    pending_id: Some(successor_pending_id),
500                    actor_id: successor_entry.actor_id.clone(),
501                    risk_tier: successor_entry.risk_tier.clone(),
502                    outcome: GatingOutcome::PendingApproval,
503                    detail: serde_json::json!({
504                        "escalated_from_pending_id": pending_id.clone(),
505                        "requested_approver": successor_entry.requested_approver,
506                        "approval_recipient": successor_entry.approval_recipient,
507                        "approval_channel": successor_entry.approval_channel,
508                        "approval_route_id": successor_entry.approval_route_id,
509                        "approval_delivery_id": successor_entry.approval_delivery_id,
510                        "deadline_at_ms": successor_entry.deadline_at_ms,
511                        "action": successor_entry.action,
512                    }),
513                });
514                (GatingOutcome::PendingApproval, "escalation_decided")
515            }
516        };
517        let decided_at_ms = current_time_ms();
518        self.append_gating_audit(GatingAuditEntry {
519            audit_id: String::new(),
520            timestamp_ms: 0,
521            event_type: event_type.to_string(),
522            action_id: pending_entry.action_id.clone(),
523            pending_id: Some(pending_id.clone()),
524            actor_id: pending_entry.actor_id.clone(),
525            risk_tier: pending_entry.risk_tier.clone(),
526            outcome: outcome.clone(),
527            detail: serde_json::json!({
528                "approver_id": approver_id,
529                "decision": decision,
530                "reason": reason,
531                "approval_route_id": pending_entry.approval_route_id,
532                "approval_delivery_id": pending_entry.approval_delivery_id,
533                "next_pending_id": next_pending_id,
534            }),
535        });
536        Ok(GatingDecisionResult {
537            pending_id,
538            action_id: pending_entry.action_id,
539            approver_id,
540            decision,
541            outcome,
542            decided_at_ms,
543            reason,
544            next_pending_id,
545        })
546    }
547
548    pub fn gating_audit_entries(&mut self, limit: usize) -> Vec<GatingAuditEntry> {
549        self.refresh_gating_timeouts();
550        self.gating_audit
551            .iter()
552            .rev()
553            .take(limit)
554            .cloned()
555            .collect::<Vec<_>>()
556            .into_iter()
557            .rev()
558            .collect()
559    }
560}