use crate::mcp::param_names;
use crate::mcp::registry::McpTool;
use crate::models::field_names;
use crate::{db, validate};
use schemars::JsonSchema;
use serde::Deserialize;
use serde_json::{Value, json};
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct PendingListRequest {
#[serde(default)]
pub status: Option<String>,
#[serde(default)]
pub limit: Option<i64>,
}
#[allow(dead_code)]
pub struct PendingListTool;
impl McpTool for PendingListTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_PENDING_LIST
}
fn description() -> &'static str {
"List pending governance-queued actions."
}
fn docs() -> &'static str {
"Task 1.9: list governance-queued actions. status filter (default pending). Limit cap 1000."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<PendingListRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Governance.name()
}
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct PendingApproveRequest {
pub id: String,
#[serde(default)]
pub remember: Option<String>,
}
#[allow(dead_code)]
pub struct PendingApproveTool;
impl McpTool for PendingApproveTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_PENDING_APPROVE
}
fn description() -> &'static str {
"Approve a pending action; `remember` auto-decides next time."
}
fn docs() -> &'static str {
"Task 1.9 approve. decided_by = caller. K10: remember (once|session|forever) writes a synthetic permit rule."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<PendingApproveRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Governance.name()
}
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct PendingRejectRequest {
pub id: String,
#[serde(default)]
pub remember: Option<String>,
}
#[allow(dead_code)]
pub struct PendingRejectTool;
impl McpTool for PendingRejectTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_PENDING_REJECT
}
fn description() -> &'static str {
"Reject a pending action; `remember` auto-decides next time."
}
fn docs() -> &'static str {
"Task 1.9 reject. decided_by = caller. K10: remember writes a synthetic deny rule."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<PendingRejectRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Governance.name()
}
}
pub fn handle_subscription_dlq_list(
conn: &rusqlite::Connection,
params: &Value,
mcp_client: Option<&str>,
) -> Result<Value, String> {
let subscription_id = params[param_names::SUBSCRIPTION_ID].as_str();
let limit = params["limit"]
.as_u64()
.map_or(crate::storage::PENDING_DEFAULT_PAGE_LIMIT, |v| {
usize::try_from(v).unwrap_or(usize::MAX)
})
.clamp(1, crate::storage::LIST_MAX_LIMIT);
let caller = crate::identity::resolve_agent_id(None, mcp_client).map_err(|e| e.to_string())?;
let rows_all =
crate::subscriptions::list_dlq(conn, subscription_id).map_err(|e| e.to_string())?;
let rows: Vec<_> = if let Some(sid) = subscription_id {
let owner = crate::subscriptions::get_owner(conn, sid).map_err(|e| e.to_string())?;
if owner.as_deref() != Some(caller.as_str()) {
return Ok(json!({
"count": 0,
(field_names::SUBSCRIPTION_ID): subscription_id,
"limit": limit,
"entries": Vec::<Value>::new(),
}));
}
rows_all
} else {
let mut owners: std::collections::HashMap<String, Option<String>> =
std::collections::HashMap::new();
let mut out = Vec::with_capacity(rows_all.len());
for row in rows_all {
let sid = row.subscription_id.clone();
let owner = match owners.get(&sid) {
Some(o) => o.clone(),
None => {
let o =
crate::subscriptions::get_owner(conn, &sid).map_err(|e| e.to_string())?;
owners.insert(sid.clone(), o.clone());
o
}
};
if owner.as_deref() == Some(caller.as_str()) {
out.push(row);
}
}
out
};
let mut rows = rows;
if rows.len() > limit {
rows.truncate(limit);
}
Ok(json!({
"count": rows.len(),
(field_names::SUBSCRIPTION_ID): subscription_id,
"limit": limit,
"entries": rows,
}))
}
pub(super) fn handle_pending_list(
conn: &rusqlite::Connection,
params: &Value,
) -> Result<Value, String> {
let status = params["status"].as_str();
let limit = params["limit"]
.as_u64()
.map_or(crate::storage::PENDING_DEFAULT_PAGE_LIMIT, |v| {
usize::try_from(v).unwrap_or(usize::MAX)
})
.min(crate::storage::LIST_MAX_LIMIT);
let items = db::list_pending_actions(conn, status, limit).map_err(|e| e.to_string())?;
Ok(json!({"count": items.len(), "pending": items}))
}
fn parse_remember_param(params: &Value) -> crate::approvals::Remember {
match params["remember"].as_str() {
Some("session") => crate::approvals::Remember::Session,
Some("forever") => crate::approvals::Remember::Forever,
Some("once") | None => crate::approvals::Remember::Once,
Some(other) => {
tracing::warn!(
"memory_pending_*: unknown remember value {other:?}, defaulting to once"
);
crate::approvals::Remember::Once
}
}
}
fn record_mcp_decision(
conn: &rusqlite::Connection,
pending_id: &str,
decided_by: &str,
decision_label: &str,
remember: crate::approvals::Remember,
) {
let pa = crate::db::get_pending_action(conn, pending_id)
.ok()
.flatten();
let remember_label = match remember {
crate::approvals::Remember::Once => "once",
crate::approvals::Remember::Session => "session",
crate::approvals::Remember::Forever => "forever",
};
let evt_namespace = pa.as_ref().map(|p| p.namespace.clone()).unwrap_or_default();
let evt_requested_by = pa
.as_ref()
.map(|p| p.requested_by.clone())
.unwrap_or_default();
crate::approvals::publish(crate::approvals::ApprovalEvent::ApprovalDecided {
pending_id: pending_id.to_string(),
decision: decision_label.to_string(),
decided_by: decided_by.to_string(),
remember: remember_label.to_string(),
namespace: evt_namespace,
requested_by: evt_requested_by,
});
if matches!(
remember,
crate::approvals::Remember::Forever | crate::approvals::Remember::Session
) && let Some(snap) = pa
{
crate::approvals::record_synthetic_rule(crate::approvals::SyntheticPermissionRule {
action_type: snap.action_type,
namespace: snap.namespace,
agent_id: Some(snap.requested_by),
decision: decision_label.to_string(),
recorded_at: chrono::Utc::now().to_rfc3339(),
});
}
}
pub fn handle_pending_approve(
conn: &rusqlite::Connection,
params: &Value,
mcp_client: Option<&str>,
) -> Result<Value, String> {
use crate::db::ApproveOutcome;
let id = params["id"]
.as_str()
.ok_or(crate::errors::msg::ID_REQUIRED)?;
validate::validate_id(id).map_err(|e| e.to_string())?;
let agent_id = crate::identity::resolve_agent_id(params["agent_id"].as_str(), mcp_client)
.map_err(|e| e.to_string())?;
let remember = parse_remember_param(params);
crate::governance::audit::record_decision(
&agent_id,
"allow",
"pending_approve",
"",
json!({ (field_names::PENDING_ID): id }),
);
match db::approve_with_approver_type(conn, id, &agent_id).map_err(|e| e.to_string())? {
ApproveOutcome::Approved => {
let executed = db::execute_pending_action(conn, id).map_err(|e| e.to_string())?;
record_mcp_decision(conn, id, &agent_id, "approve", remember);
Ok(json!({
"approved": true,
"id": id,
(field_names::DECIDED_BY): agent_id,
"executed": true,
"memory_id": executed,
"remember": match remember {
crate::approvals::Remember::Once => "once",
crate::approvals::Remember::Session => "session",
crate::approvals::Remember::Forever => "forever",
},
}))
}
ApproveOutcome::Pending { votes, quorum } => Ok(json!({
"approved": false,
"status": "pending",
"id": id,
"votes": votes,
"quorum": quorum,
"reason": crate::errors::msg::CONSENSUS_NOT_REACHED,
})),
ApproveOutcome::NotFound => Err(crate::errors::msg::pending_action_not_found(id)),
ApproveOutcome::Rejected(reason) => Err(crate::errors::msg::approve_rejected(reason)),
}
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct SubscriptionDlqListRequest {
#[serde(default)]
pub subscription_id: Option<String>,
#[serde(default)]
pub limit: Option<i64>,
}
#[allow(dead_code)]
pub struct SubscriptionDlqListTool;
impl McpTool for SubscriptionDlqListTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_SUBSCRIPTION_DLQ_LIST
}
fn description() -> &'static str {
"List subscription_dlq rows (exhausted retry ladder)."
}
fn docs() -> &'static str {
"K7: DLQ inspector."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<SubscriptionDlqListRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Power.name()
}
}
#[cfg(test)]
mod d1_5_986_tests {
use super::*;
use crate::mcp::parity_test_helpers::{
assert_descriptions_match, assert_property_set_parity, derived_props_for,
};
#[test]
fn subscription_dlq_list_parity_986() {
let derived = derived_props_for::<SubscriptionDlqListRequest>();
assert_property_set_parity("memory_subscription_dlq_list", &derived);
assert_descriptions_match("memory_subscription_dlq_list", &derived);
}
#[test]
fn subscription_dlq_list_tool_metadata_986() {
assert_eq!(
SubscriptionDlqListTool::name(),
"memory_subscription_dlq_list"
);
assert_eq!(SubscriptionDlqListTool::family(), "power");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::Tier;
use crate::storage as db;
use serde_json::json;
fn fresh_conn() -> rusqlite::Connection {
db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
}
fn queue_pending(conn: &rusqlite::Connection, requester: &str) -> String {
db::queue_pending_action(
conn,
crate::models::GovernedAction::Reflect,
"pa-ns",
None,
requester,
&json!({"k": "v"}),
)
.expect("queue")
}
fn queue_pending_promote_unbound(conn: &rusqlite::Connection, requester: &str) -> String {
db::queue_pending_action(
conn,
crate::models::GovernedAction::Promote,
"pa-ns",
Some("11111111-2222-3333-4444-555555555555"),
requester,
&json!({"target_tier": Tier::Long.as_str()}),
)
.expect("queue")
}
#[test]
fn parse_remember_param_returns_session() {
let r = super::parse_remember_param(&json!({"remember": "session"}));
assert!(matches!(r, crate::approvals::Remember::Session));
}
#[test]
fn parse_remember_param_returns_forever() {
let r = super::parse_remember_param(&json!({"remember": "forever"}));
assert!(matches!(r, crate::approvals::Remember::Forever));
}
#[test]
fn parse_remember_param_returns_once_when_explicit() {
let r = super::parse_remember_param(&json!({"remember": "once"}));
assert!(matches!(r, crate::approvals::Remember::Once));
}
#[test]
fn parse_remember_param_returns_once_when_absent() {
let r = super::parse_remember_param(&json!({}));
assert!(matches!(r, crate::approvals::Remember::Once));
}
#[test]
fn parse_remember_param_unknown_defaults_to_once() {
let r = super::parse_remember_param(&json!({"remember": "weird-value"}));
assert!(matches!(r, crate::approvals::Remember::Once));
}
#[test]
fn subscription_dlq_list_empty() {
let conn = fresh_conn();
let resp = handle_subscription_dlq_list(&conn, &json!({}), None).expect("ok");
assert_eq!(resp["count"].as_u64(), Some(0));
assert!(resp["entries"].is_array());
}
#[test]
fn subscription_dlq_list_limit_clamped() {
let conn = fresh_conn();
let resp = handle_subscription_dlq_list(&conn, &json!({"limit": 0u64}), None).expect("ok");
assert!(resp["limit"].as_u64().unwrap() >= 1);
}
#[test]
fn subscription_dlq_list_with_filter() {
let conn = fresh_conn();
let resp = handle_subscription_dlq_list(&conn, &json!({"subscription_id": "sub-x"}), None)
.expect("ok");
assert_eq!(resp["subscription_id"].as_str(), Some("sub-x"));
}
#[test]
fn subscription_dlq_list_cross_tenant_refused_1118() {
let conn = fresh_conn();
db::register_agent(&conn, "ai:alice", "test", &[]).expect("register alice");
let sid = crate::subscriptions::insert(
&conn,
&crate::subscriptions::NewSubscription {
url: "https://example.com/alice",
events: "memory_store",
secret: Some("sek-alice"),
namespace_filter: None,
agent_filter: None,
created_by: Some("ai:alice"),
event_types: None,
},
)
.expect("insert alice sub");
conn.execute(
"INSERT INTO subscription_dlq \
(subscription_id, correlation_id, event_type, payload, retry_count, last_error, first_failed_at, last_failed_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![&sid, "alice-corr-1", "memory_store", "{\"id\":\"m1\"}", 3i64, "5xx", "2026-01-01T00:00:00Z", "2026-01-01T00:00:00Z"],
).expect("record dlq");
let resp = handle_subscription_dlq_list(
&conn,
&json!({"subscription_id": sid}),
Some("ai:bob-client"),
)
.expect("ok");
assert_eq!(resp["count"].as_u64(), Some(0));
assert!(resp["entries"].as_array().unwrap().is_empty());
let resp_unfiltered =
handle_subscription_dlq_list(&conn, &json!({}), Some("ai:bob-client")).expect("ok");
assert_eq!(resp_unfiltered["count"].as_u64(), Some(0));
}
#[test]
fn pending_list_returns_count_and_array() {
let conn = fresh_conn();
let _id = queue_pending(&conn, "ai:tester");
let resp = handle_pending_list(&conn, &json!({})).expect("ok");
assert!(resp["count"].as_u64().unwrap() >= 1);
assert!(resp["pending"].is_array());
}
#[test]
fn pending_list_with_status_and_limit() {
let conn = fresh_conn();
let _id = queue_pending(&conn, "ai:tester");
let resp = handle_pending_list(&conn, &json!({"status": "pending", "limit": 5000u64}))
.expect("ok");
assert!(resp["count"].as_u64().unwrap() >= 1);
}
#[test]
fn pending_approve_reaches_execute_step() {
let conn = fresh_conn();
let id = queue_pending_promote_unbound(&conn, "ai:tester");
let result = handle_pending_approve(
&conn,
&json!({"id": id, "agent_id": "ai:approver", "remember": "forever"}),
None,
);
match result {
Ok(resp) => {
assert_eq!(resp["approved"], true);
assert_eq!(resp["remember"].as_str(), Some("forever"));
}
Err(e) => assert!(!e.is_empty()),
}
}
#[test]
fn pending_approve_missing_id_errors() {
let conn = fresh_conn();
let err = handle_pending_approve(&conn, &json!({}), None).unwrap_err();
assert!(err.contains("id"), "got: {err}");
}
#[test]
fn pending_approve_invalid_id_rejected() {
let conn = fresh_conn();
let err = handle_pending_approve(&conn, &json!({"id": " "}), None).unwrap_err();
assert!(!err.is_empty());
}
#[test]
fn pending_approve_unknown_id_rejected() {
let conn = fresh_conn();
let err = handle_pending_approve(
&conn,
&json!({"id": "00000000-0000-0000-0000-000000000000"}),
None,
)
.unwrap_err();
assert!(err.contains("pending action not found"), "got: {err}");
}
#[test]
fn pending_reject_happy_path() {
let conn = fresh_conn();
let id = queue_pending(&conn, "ai:tester");
let resp = handle_pending_reject(
&conn,
&json!({"id": id, "agent_id": "ai:rejecter", "remember": "session"}),
None,
)
.expect("ok");
assert_eq!(resp["rejected"], true);
assert_eq!(resp["remember"].as_str(), Some("session"));
}
#[test]
fn pending_reject_default_remember_is_once() {
let conn = fresh_conn();
let id = queue_pending(&conn, "ai:tester");
let resp =
handle_pending_reject(&conn, &json!({"id": id, "agent_id": "ai:rejecter"}), None)
.expect("ok");
assert_eq!(resp["remember"].as_str(), Some("once"));
}
#[test]
fn pending_reject_missing_id_errors() {
let conn = fresh_conn();
let err = handle_pending_reject(&conn, &json!({}), None).unwrap_err();
assert!(err.contains("id"), "got: {err}");
}
#[test]
fn pending_reject_unknown_id_errors() {
let conn = fresh_conn();
let err = handle_pending_reject(
&conn,
&json!({"id": "00000000-0000-0000-0000-000000000000"}),
None,
)
.unwrap_err();
assert!(
err.contains("not found") || err.contains("already decided"),
"got: {err}"
);
}
}
pub fn handle_pending_reject(
conn: &rusqlite::Connection,
params: &Value,
mcp_client: Option<&str>,
) -> Result<Value, String> {
let id = params["id"]
.as_str()
.ok_or(crate::errors::msg::ID_REQUIRED)?;
validate::validate_id(id).map_err(|e| e.to_string())?;
let agent_id = crate::identity::resolve_agent_id(params["agent_id"].as_str(), mcp_client)
.map_err(|e| e.to_string())?;
let remember = parse_remember_param(params);
crate::governance::audit::record_decision(
&agent_id,
"refuse",
"pending_reject",
"",
json!({ "pending_id": id }),
);
let transitioned =
db::decide_pending_action(conn, id, false, &agent_id).map_err(|e| e.to_string())?;
if !transitioned {
return Err(format!("pending action not found or already decided: {id}"));
}
record_mcp_decision(conn, id, &agent_id, "deny", remember);
Ok(json!({
"rejected": true,
"id": id,
"decided_by": agent_id,
"remember": match remember {
crate::approvals::Remember::Once => "once",
crate::approvals::Remember::Session => "session",
crate::approvals::Remember::Forever => "forever",
},
}))
}
#[cfg(test)]
mod d1_4_985_tests {
use super::*;
use crate::mcp::d1_4_985_helpers::{
assert_descriptions_match, assert_property_set_parity, derived_props_for,
};
#[test]
fn memory_pending_list_parity_985() {
let derived = derived_props_for::<PendingListRequest>();
assert_property_set_parity("memory_pending_list", &derived);
assert_descriptions_match("memory_pending_list", &derived);
}
#[test]
fn memory_pending_list_tool_metadata_985() {
assert_eq!(PendingListTool::name(), "memory_pending_list");
assert_eq!(PendingListTool::family(), "governance");
}
#[test]
fn memory_pending_approve_parity_985() {
let derived = derived_props_for::<PendingApproveRequest>();
assert_property_set_parity("memory_pending_approve", &derived);
assert_descriptions_match("memory_pending_approve", &derived);
}
#[test]
fn memory_pending_approve_tool_metadata_985() {
assert_eq!(PendingApproveTool::name(), "memory_pending_approve");
assert_eq!(PendingApproveTool::family(), "governance");
}
#[test]
fn memory_pending_reject_parity_985() {
let derived = derived_props_for::<PendingRejectRequest>();
assert_property_set_parity("memory_pending_reject", &derived);
assert_descriptions_match("memory_pending_reject", &derived);
}
#[test]
fn memory_pending_reject_tool_metadata_985() {
assert_eq!(PendingRejectTool::name(), "memory_pending_reject");
assert_eq!(PendingRejectTool::family(), "governance");
}
}