1use super::module_boundary::{
4 CORE_MODULE_MCP_TIMEOUT, MEMORY_CONFLICT_READ_MCP_TOOL, call_module_mcp_tool_json,
5 mcp_required_error, module_uses_mcp,
6};
7use super::*;
8
9impl MobkitRuntimeHandle {
10 fn next_gating_sequence(&mut self) -> u64 {
11 Self::next_sequence(&mut self.gating_sequence)
12 }
13 fn append_gating_audit(&mut self, mut entry: GatingAuditEntry) {
14 let audit_sequence = self.next_gating_sequence();
15 entry.audit_id = format!("gate-audit-{audit_sequence:06}");
16 entry.timestamp_ms = current_time_ms();
17 self.gating_audit.push(entry);
18 while self.gating_audit.len() > GATING_AUDIT_MAX_RETAINED {
19 self.gating_audit.remove(0);
20 }
21 }
22 fn refresh_gating_timeouts(&mut self) {
23 let now_ms = current_time_ms();
24 let expired = self
25 .gating_pending
26 .iter()
27 .filter(|(_, entry)| now_ms >= entry.deadline_at_ms)
28 .map(|(pending_id, _)| pending_id.clone())
29 .collect::<Vec<_>>();
30 for pending_id in expired {
31 if let Some(expired_entry) = self.gating_pending.remove(&pending_id) {
32 self.gating_pending_order
33 .retain(|candidate| candidate != &pending_id);
34 self.append_gating_audit(GatingAuditEntry {
35 audit_id: String::new(),
36 timestamp_ms: 0,
37 event_type: "timeout_fallback".to_string(),
38 action_id: expired_entry.action_id.clone(),
39 pending_id: Some(pending_id),
40 actor_id: expired_entry.actor_id,
41 risk_tier: expired_entry.risk_tier,
42 outcome: GatingOutcome::SafeDraft,
43 detail: serde_json::json!({
44 "fallback": "safe_draft",
45 "reason": "approval_timeout"
46 }),
47 });
48 }
49 }
50 }
51 fn upsert_gating_pending_entry(&mut self, entry: GatingPendingEntry) {
52 let pending_id = entry.pending_id.clone();
53 self.gating_pending.insert(pending_id.clone(), entry);
54 self.gating_pending_order
55 .retain(|candidate| candidate != &pending_id);
56 self.gating_pending_order.push(pending_id);
57 while self.gating_pending_order.len() > GATING_PENDING_MAX_RETAINED {
58 let oldest = self.gating_pending_order.remove(0);
59 self.gating_pending.remove(&oldest);
60 }
61 }
62
63 fn parse_memory_conflict_mcp_response(
64 response: Value,
65 ) -> Result<Option<MemoryConflictSignal>, RuntimeBoundaryError> {
66 let candidate = response
67 .as_object()
68 .and_then(|payload| payload.get("conflict"))
69 .cloned()
70 .unwrap_or(response);
71 if candidate.is_null() {
72 return Ok(None);
73 }
74 serde_json::from_value::<MemoryConflictSignal>(candidate)
75 .map(Some)
76 .map_err(|error| {
77 RuntimeBoundaryError::Mcp(McpBoundaryError::InvalidToolPayload {
78 module_id: "memory".to_string(),
79 tool: MEMORY_CONFLICT_READ_MCP_TOOL.to_string(),
80 reason: error.to_string(),
81 })
82 })
83 }
84
85 fn gating_memory_conflict_for_reference(
86 &self,
87 entity: Option<&str>,
88 topic: Option<&str>,
89 ) -> Result<Option<MemoryConflictSignal>, RuntimeBoundaryError> {
90 if !self.is_module_loaded("memory") {
91 return Ok(self.memory_conflict_for_reference(entity, topic));
92 }
93
94 let Some((memory_module, pre_spawn)) = self.module_and_prespawn("memory") else {
95 return Err(mcp_required_error("memory", MEMORY_CONFLICT_READ_MCP_TOOL));
96 };
97 if !module_uses_mcp(memory_module, pre_spawn) {
98 return Err(mcp_required_error("memory", MEMORY_CONFLICT_READ_MCP_TOOL));
99 }
100
101 let response = call_module_mcp_tool_json(
102 memory_module,
103 pre_spawn,
104 MEMORY_CONFLICT_READ_MCP_TOOL,
105 &serde_json::json!({
106 "entity": entity,
107 "topic": topic,
108 }),
109 CORE_MODULE_MCP_TIMEOUT,
110 )?;
111 Self::parse_memory_conflict_mcp_response(response)
112 }
113
114 pub fn evaluate_gating_action(
115 &mut self,
116 request: GatingEvaluateRequest,
117 ) -> GatingEvaluateResult {
118 self.refresh_gating_timeouts();
119 let action = request.action.trim().to_string();
120 let actor_id = request.actor_id.trim().to_string();
121 let requested_approver = request
122 .requested_approver
123 .as_deref()
124 .map(str::trim)
125 .filter(|value| !value.is_empty())
126 .map(ToString::to_string);
127 let approval_recipient = request
128 .approval_recipient
129 .as_deref()
130 .map(str::trim)
131 .filter(|value| !value.is_empty())
132 .map(ToString::to_string);
133 let approval_channel = request
134 .approval_channel
135 .as_deref()
136 .map(str::trim)
137 .filter(|value| !value.is_empty())
138 .map(ToString::to_string);
139 let entity = request
140 .entity
141 .as_deref()
142 .map(str::trim)
143 .filter(|value| !value.is_empty())
144 .map(ToString::to_string);
145 let topic = request
146 .topic
147 .as_deref()
148 .map(str::trim)
149 .filter(|value| !value.is_empty())
150 .map(ToString::to_string);
151 let action_sequence = self.next_gating_sequence();
152 let action_id = format!("gate-action-{action_sequence:06}");
153 let risk_tier = request.risk_tier.clone();
154
155 if matches!(request.risk_tier, GatingRiskTier::R2 | GatingRiskTier::R3) {
156 if !self.memory_conflicts.is_empty() && (entity.is_none() || topic.is_none()) {
157 self.append_gating_audit(GatingAuditEntry {
158 audit_id: String::new(),
159 timestamp_ms: 0,
160 event_type: "conflict_blocked".to_string(),
161 action_id: action_id.clone(),
162 pending_id: None,
163 actor_id: actor_id.clone(),
164 risk_tier: risk_tier.clone(),
165 outcome: GatingOutcome::SafeDraft,
166 detail: serde_json::json!({
167 "policy": "memory_conflict_context_required_v0_1",
168 "reason": "memory_conflict_context_missing",
169 "action": action,
170 "reference": {
171 "entity": entity,
172 "topic": topic,
173 },
174 "missing_context": {
175 "entity": entity.is_none(),
176 "topic": topic.is_none(),
177 },
178 "conflict_count": self.memory_conflicts.len(),
179 }),
180 });
181 return GatingEvaluateResult {
182 action_id,
183 action,
184 actor_id,
185 risk_tier,
186 outcome: GatingOutcome::SafeDraft,
187 pending_id: None,
188 fallback_reason: Some("memory_conflict_context_missing".to_string()),
189 };
190 }
191 let conflict = match self
192 .gating_memory_conflict_for_reference(entity.as_deref(), topic.as_deref())
193 {
194 Ok(conflict) => conflict,
195 Err(error) => {
196 self.append_gating_audit(GatingAuditEntry {
197 audit_id: String::new(),
198 timestamp_ms: 0,
199 event_type: "memory_conflict_lookup_failed".to_string(),
200 action_id: action_id.clone(),
201 pending_id: None,
202 actor_id: actor_id.clone(),
203 risk_tier: risk_tier.clone(),
204 outcome: GatingOutcome::SafeDraft,
205 detail: serde_json::json!({
206 "policy": "memory_conflict_lookup_via_core_mcp",
207 "reason": "memory_conflict_lookup_failed",
208 "error": format!("{error:?}"),
209 "reference": {
210 "entity": entity,
211 "topic": topic,
212 },
213 }),
214 });
215 return GatingEvaluateResult {
216 action_id,
217 action,
218 actor_id,
219 risk_tier,
220 outcome: GatingOutcome::SafeDraft,
221 pending_id: None,
222 fallback_reason: Some("memory_conflict_lookup_failed".to_string()),
223 };
224 }
225 };
226 if let Some(conflict) = conflict {
227 self.append_gating_audit(GatingAuditEntry {
228 audit_id: String::new(),
229 timestamp_ms: 0,
230 event_type: "conflict_blocked".to_string(),
231 action_id: action_id.clone(),
232 pending_id: None,
233 actor_id: actor_id.clone(),
234 risk_tier: risk_tier.clone(),
235 outcome: GatingOutcome::SafeDraft,
236 detail: serde_json::json!({
237 "policy": "memory_conflict_block_v0_1",
238 "reason": "memory_conflict",
239 "action": action,
240 "reference": {
241 "entity": entity,
242 "topic": topic,
243 },
244 "conflict": conflict,
245 }),
246 });
247 return GatingEvaluateResult {
248 action_id,
249 action,
250 actor_id,
251 risk_tier,
252 outcome: GatingOutcome::SafeDraft,
253 pending_id: None,
254 fallback_reason: Some("memory_conflict".to_string()),
255 };
256 }
257 }
258
259 match request.risk_tier {
260 GatingRiskTier::R0 | GatingRiskTier::R1 => {
261 self.append_gating_audit(GatingAuditEntry {
262 audit_id: String::new(),
263 timestamp_ms: 0,
264 event_type: "evaluated".to_string(),
265 action_id: action_id.clone(),
266 pending_id: None,
267 actor_id: actor_id.clone(),
268 risk_tier: risk_tier.clone(),
269 outcome: GatingOutcome::Allowed,
270 detail: serde_json::json!({
271 "policy": "allow_immediate",
272 "rationale": request.rationale,
273 "action": action,
274 }),
275 });
276 GatingEvaluateResult {
277 action_id,
278 action,
279 actor_id,
280 risk_tier,
281 outcome: GatingOutcome::Allowed,
282 pending_id: None,
283 fallback_reason: None,
284 }
285 }
286 GatingRiskTier::R2 => {
287 self.append_gating_audit(GatingAuditEntry {
288 audit_id: String::new(),
289 timestamp_ms: 0,
290 event_type: "evaluated".to_string(),
291 action_id: action_id.clone(),
292 pending_id: None,
293 actor_id: actor_id.clone(),
294 risk_tier: risk_tier.clone(),
295 outcome: GatingOutcome::AllowedWithAudit,
296 detail: serde_json::json!({
297 "policy": "consequence_mode_allow_with_audit_v0_1",
298 "rationale": request.rationale,
299 "action": action,
300 }),
301 });
302 GatingEvaluateResult {
303 action_id,
304 action,
305 actor_id,
306 risk_tier,
307 outcome: GatingOutcome::AllowedWithAudit,
308 pending_id: None,
309 fallback_reason: None,
310 }
311 }
312 GatingRiskTier::R3 => {
313 let pending_sequence = self.next_gating_sequence();
314 let pending_id = format!("gate-pending-{pending_sequence:06}");
315 let created_at_ms = current_time_ms();
316 let timeout_ms = request
317 .approval_timeout_ms
318 .unwrap_or(GATING_APPROVAL_TIMEOUT_DEFAULT_MS);
319 let mut approval_route_id = None;
320 let mut approval_delivery_id = None;
321 let mut approval_notification_error = None;
322
323 if let (Some(recipient), Some(channel)) =
324 (approval_recipient.as_ref(), approval_channel.as_ref())
325 {
326 if self.is_module_loaded("router") && self.is_module_loaded("delivery") {
327 match self.resolve_routing(RoutingResolveRequest {
328 recipient: recipient.clone(),
329 channel: Some(channel.clone()),
330 retry_max: None,
331 backoff_ms: None,
332 rate_limit_per_minute: None,
333 }) {
334 Ok(resolution) => {
335 approval_route_id = Some(resolution.route_id.clone());
336 match self.send_delivery(DeliverySendRequest {
337 resolution,
338 payload: serde_json::json!({
339 "kind": "gating_approval_request",
340 "pending_id": pending_id,
341 "action_id": action_id,
342 "action": action,
343 "actor_id": actor_id,
344 "risk_tier": risk_tier,
345 "requested_approver": requested_approver,
346 "deadline_at_ms": created_at_ms.saturating_add(timeout_ms),
347 }),
348 idempotency_key: Some(format!("gating-approval-{pending_id}")),
349 }) {
350 Ok(record) => {
351 if record.status == "sent" {
352 approval_delivery_id = Some(record.delivery_id);
353 } else {
354 approval_notification_error = Some(format!(
355 "delivery_status:{}:{}",
356 record.status, record.delivery_id
357 ));
358 }
359 }
360 Err(err) => {
361 approval_notification_error =
362 Some(format!("delivery:{err:?}"));
363 }
364 }
365 }
366 Err(err) => {
367 approval_notification_error = Some(format!("routing:{err:?}"));
368 }
369 }
370 } else {
371 let mut missing_modules = Vec::new();
372 if !self.is_module_loaded("router") {
373 missing_modules.push("router");
374 }
375 if !self.is_module_loaded("delivery") {
376 missing_modules.push("delivery");
377 }
378 approval_notification_error = Some(format!(
379 "notification_modules_unavailable:{}",
380 missing_modules.join(",")
381 ));
382 }
383 }
384 let pending_entry = GatingPendingEntry {
385 pending_id: pending_id.clone(),
386 action_id: action_id.clone(),
387 action: action.clone(),
388 actor_id: actor_id.clone(),
389 risk_tier: risk_tier.clone(),
390 requested_approver,
391 approval_recipient,
392 approval_channel,
393 approval_route_id,
394 approval_delivery_id,
395 created_at_ms,
396 deadline_at_ms: created_at_ms.saturating_add(timeout_ms),
397 };
398 self.upsert_gating_pending_entry(pending_entry.clone());
399 self.append_gating_audit(GatingAuditEntry {
400 audit_id: String::new(),
401 timestamp_ms: 0,
402 event_type: "pending_created".to_string(),
403 action_id: action_id.clone(),
404 pending_id: Some(pending_id.clone()),
405 actor_id: actor_id.clone(),
406 risk_tier: risk_tier.clone(),
407 outcome: GatingOutcome::PendingApproval,
408 detail: serde_json::json!({
409 "requested_approver": pending_entry.requested_approver,
410 "approval_recipient": pending_entry.approval_recipient,
411 "approval_channel": pending_entry.approval_channel,
412 "approval_route_id": pending_entry.approval_route_id,
413 "approval_delivery_id": pending_entry.approval_delivery_id,
414 "approval_notification_error": approval_notification_error,
415 "deadline_at_ms": pending_entry.deadline_at_ms,
416 "action": action,
417 }),
418 });
419 GatingEvaluateResult {
420 action_id,
421 action,
422 actor_id,
423 risk_tier,
424 outcome: GatingOutcome::PendingApproval,
425 pending_id: Some(pending_id),
426 fallback_reason: None,
427 }
428 }
429 }
430 }
431
432 pub fn list_gating_pending(&mut self) -> Vec<GatingPendingEntry> {
433 self.refresh_gating_timeouts();
434 self.gating_pending_order
435 .iter()
436 .filter_map(|pending_id| self.gating_pending.get(pending_id).cloned())
437 .collect()
438 }
439
440 pub fn decide_gating_action(
441 &mut self,
442 request: GatingDecideRequest,
443 ) -> Result<GatingDecisionResult, GatingDecideError> {
444 self.refresh_gating_timeouts();
445 let decision = request.decision.clone();
446 let reason = request.reason.clone();
447 let pending_id = request.pending_id.trim().to_string();
448 let approver_id = request.approver_id.trim().to_string();
449 let pending_entry = self
450 .gating_pending
451 .remove(&pending_id)
452 .ok_or_else(|| GatingDecideError::UnknownPendingId(pending_id.clone()))?;
453 self.gating_pending_order
454 .retain(|candidate| candidate != &pending_id);
455
456 if matches!(decision, GatingDecision::Approve) && approver_id == pending_entry.actor_id {
457 self.upsert_gating_pending_entry(pending_entry);
458 return Err(GatingDecideError::SelfApprovalForbidden);
459 }
460 if let Some(expected_approver) = pending_entry.requested_approver.as_deref()
461 && expected_approver != approver_id
462 {
463 let expected = expected_approver.to_string();
464 self.upsert_gating_pending_entry(pending_entry);
465 return Err(GatingDecideError::ApproverMismatch {
466 expected,
467 provided: approver_id,
468 });
469 }
470
471 let mut next_pending_id = None;
472 let (outcome, event_type) = match decision {
473 GatingDecision::Approve => (GatingOutcome::Allowed, "approval_decided"),
474 GatingDecision::Reject => (GatingOutcome::SafeDraft, "rejection_decided"),
475 GatingDecision::Escalate => {
476 let successor_sequence = self.next_gating_sequence();
477 let successor_pending_id = format!("gate-pending-{successor_sequence:06}");
478 let successor_entry = GatingPendingEntry {
479 pending_id: successor_pending_id.clone(),
480 action_id: pending_entry.action_id.clone(),
481 action: pending_entry.action.clone(),
482 actor_id: pending_entry.actor_id.clone(),
483 risk_tier: pending_entry.risk_tier.clone(),
484 requested_approver: None,
485 approval_recipient: pending_entry.approval_recipient.clone(),
486 approval_channel: pending_entry.approval_channel.clone(),
487 approval_route_id: None,
488 approval_delivery_id: None,
489 created_at_ms: current_time_ms(),
490 deadline_at_ms: pending_entry.deadline_at_ms,
491 };
492 self.upsert_gating_pending_entry(successor_entry.clone());
493 next_pending_id = Some(successor_pending_id.clone());
494 self.append_gating_audit(GatingAuditEntry {
495 audit_id: String::new(),
496 timestamp_ms: 0,
497 event_type: "pending_created".to_string(),
498 action_id: successor_entry.action_id.clone(),
499 pending_id: Some(successor_pending_id),
500 actor_id: successor_entry.actor_id.clone(),
501 risk_tier: successor_entry.risk_tier.clone(),
502 outcome: GatingOutcome::PendingApproval,
503 detail: serde_json::json!({
504 "escalated_from_pending_id": pending_id.clone(),
505 "requested_approver": successor_entry.requested_approver,
506 "approval_recipient": successor_entry.approval_recipient,
507 "approval_channel": successor_entry.approval_channel,
508 "approval_route_id": successor_entry.approval_route_id,
509 "approval_delivery_id": successor_entry.approval_delivery_id,
510 "deadline_at_ms": successor_entry.deadline_at_ms,
511 "action": successor_entry.action,
512 }),
513 });
514 (GatingOutcome::PendingApproval, "escalation_decided")
515 }
516 };
517 let decided_at_ms = current_time_ms();
518 self.append_gating_audit(GatingAuditEntry {
519 audit_id: String::new(),
520 timestamp_ms: 0,
521 event_type: event_type.to_string(),
522 action_id: pending_entry.action_id.clone(),
523 pending_id: Some(pending_id.clone()),
524 actor_id: pending_entry.actor_id.clone(),
525 risk_tier: pending_entry.risk_tier.clone(),
526 outcome: outcome.clone(),
527 detail: serde_json::json!({
528 "approver_id": approver_id,
529 "decision": decision,
530 "reason": reason,
531 "approval_route_id": pending_entry.approval_route_id,
532 "approval_delivery_id": pending_entry.approval_delivery_id,
533 "next_pending_id": next_pending_id,
534 }),
535 });
536 Ok(GatingDecisionResult {
537 pending_id,
538 action_id: pending_entry.action_id,
539 approver_id,
540 decision,
541 outcome,
542 decided_at_ms,
543 reason,
544 next_pending_id,
545 })
546 }
547
548 pub fn gating_audit_entries(&mut self, limit: usize) -> Vec<GatingAuditEntry> {
549 self.refresh_gating_timeouts();
550 self.gating_audit
551 .iter()
552 .rev()
553 .take(limit)
554 .cloned()
555 .collect::<Vec<_>>()
556 .into_iter()
557 .rev()
558 .collect()
559 }
560}