use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ProcessingScope {
Conversation {
agent_id: String,
channel: String,
account_id: String,
contact_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
mcp_channel_source: Option<String>,
},
AgentBinding {
agent_id: String,
channel: String,
account_id: String,
},
Agent {
agent_id: String,
},
EventStream {
agent_id: String,
subject_pattern: String,
},
BatchQueue {
agent_id: String,
queue_name: String,
},
Custom {
agent_id: String,
scope_kind: String,
scope_id: String,
},
}
impl ProcessingScope {
pub fn is_v0_supported(&self) -> bool {
matches!(self, ProcessingScope::Conversation { .. })
}
pub fn agent_id(&self) -> &str {
match self {
ProcessingScope::Conversation { agent_id, .. }
| ProcessingScope::AgentBinding { agent_id, .. }
| ProcessingScope::Agent { agent_id }
| ProcessingScope::EventStream { agent_id, .. }
| ProcessingScope::BatchQueue { agent_id, .. }
| ProcessingScope::Custom { agent_id, .. } => agent_id,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum InterventionAction {
Reply {
channel: String,
account_id: String,
to: String,
body: String,
msg_kind: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
attachments: Vec<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
reply_to_msg_id: Option<String>,
},
SkipItem {
item_id: String,
reason: String,
},
OverrideOutput {
value: Value,
},
InjectInput {
content: Value,
},
Custom {
action_kind: String,
payload: Value,
},
}
impl InterventionAction {
pub fn is_v0_supported(&self) -> bool {
matches!(self, InterventionAction::Reply { .. })
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum ProcessingControlState {
AgentActive,
PausedByOperator {
scope: ProcessingScope,
paused_at_ms: u64,
operator_token_hash: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
reason: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ProcessingPauseParams {
pub scope: ProcessingScope,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
pub operator_token_hash: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct ProcessingAck {
pub changed: bool,
pub correlation_id: Uuid,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transcript_stamped: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub drained_pending: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ProcessingResumeParams {
pub scope: ProcessingScope,
pub operator_token_hash: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<Uuid>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub summary_for_agent: Option<String>,
}
pub const PROCESSING_SUMMARY_MAX_LEN: usize = 4096;
pub const DEFAULT_PENDING_INBOUNDS_CAP: usize = 50;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PendingInbound {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message_id: Option<Uuid>,
pub from_contact_id: String,
pub body: String,
pub timestamp_ms: u64,
pub source_plugin: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ProcessingInterventionParams {
pub scope: ProcessingScope,
pub action: InterventionAction,
pub operator_token_hash: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<Uuid>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ProcessingStateParams {
pub scope: ProcessingScope,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ProcessingStateResponse {
pub state: ProcessingControlState,
}
pub const PROCESSING_STATE_CHANGED_NOTIFY_METHOD: &str = "nexo/notify/processing_state_changed";
pub const PROCESSING_PAUSE_METHOD: &str = "nexo/admin/processing/pause";
pub const PROCESSING_RESUME_METHOD: &str = "nexo/admin/processing/resume";
pub const PROCESSING_INTERVENTION_METHOD: &str = "nexo/admin/processing/intervention";
pub const PROCESSING_STATE_METHOD: &str = "nexo/admin/processing/state";
#[cfg(test)]
mod tests {
use super::*;
fn conversation_scope() -> ProcessingScope {
ProcessingScope::Conversation {
agent_id: "ana".into(),
channel: "whatsapp".into(),
account_id: "55-1234".into(),
contact_id: "55-5678".into(),
mcp_channel_source: None,
}
}
#[test]
fn conversation_scope_round_trip_omits_unset_mcp_source() {
let s = conversation_scope();
let v = serde_json::to_value(&s).unwrap();
assert_eq!(v["kind"], "conversation");
assert!(v.get("mcp_channel_source").is_none());
let back: ProcessingScope = serde_json::from_value(v).unwrap();
assert_eq!(back, s);
}
#[test]
fn reply_action_round_trip_with_attachments_and_reply_to() {
let action = InterventionAction::Reply {
channel: "whatsapp".into(),
account_id: "55-1234".into(),
to: "55-5678".into(),
body: "ya te resuelvo".into(),
msg_kind: "text".into(),
attachments: vec![serde_json::json!({"url": "https://x"})],
reply_to_msg_id: Some("WAID:abc".into()),
};
let v = serde_json::to_value(&action).unwrap();
assert_eq!(v["kind"], "reply");
let back: InterventionAction = serde_json::from_value(v).unwrap();
assert_eq!(back, action);
}
#[test]
fn v0_supported_predicates_match_spec() {
assert!(conversation_scope().is_v0_supported());
assert!(!ProcessingScope::Agent {
agent_id: "ana".into()
}
.is_v0_supported());
assert!(matches!(
InterventionAction::Reply {
channel: "whatsapp".into(),
account_id: "a".into(),
to: "t".into(),
body: "b".into(),
msg_kind: "text".into(),
attachments: vec![],
reply_to_msg_id: None,
},
ref a if a.is_v0_supported()
));
let skip = InterventionAction::SkipItem {
item_id: "x".into(),
reason: "y".into(),
};
assert!(!skip.is_v0_supported());
}
#[test]
fn paused_state_round_trip_carries_token_hash() {
let st = ProcessingControlState::PausedByOperator {
scope: conversation_scope(),
paused_at_ms: 1_700_000_000_000,
operator_token_hash: "abcdef0123456789".into(),
reason: Some("escalated".into()),
};
let v = serde_json::to_value(&st).unwrap();
assert_eq!(v["state"], "paused_by_operator");
assert_eq!(v["operator_token_hash"], "abcdef0123456789");
let back: ProcessingControlState = serde_json::from_value(v).unwrap();
assert_eq!(back, st);
let active = ProcessingControlState::AgentActive;
let av = serde_json::to_value(&active).unwrap();
assert_eq!(av["state"], "agent_active");
assert_eq!(
PROCESSING_STATE_CHANGED_NOTIFY_METHOD,
"nexo/notify/processing_state_changed"
);
}
#[test]
fn intervention_params_round_trip_with_session_id() {
let p = ProcessingInterventionParams {
scope: conversation_scope(),
action: InterventionAction::Reply {
channel: "whatsapp".into(),
account_id: "55-1234".into(),
to: "55-5678".into(),
body: "ok".into(),
msg_kind: "text".into(),
attachments: vec![],
reply_to_msg_id: None,
},
operator_token_hash: "abcdef0123456789".into(),
session_id: Some(Uuid::nil()),
};
let v = serde_json::to_value(&p).unwrap();
assert_eq!(v["session_id"], "00000000-0000-0000-0000-000000000000");
let back: ProcessingInterventionParams = serde_json::from_value(v).unwrap();
assert_eq!(back, p);
}
#[test]
fn intervention_params_legacy_payload_without_session_id_deserializes() {
let raw = serde_json::json!({
"scope": {
"kind": "conversation",
"agent_id": "ana",
"channel": "whatsapp",
"account_id": "55-1234",
"contact_id": "55-5678",
},
"action": {
"kind": "reply",
"channel": "whatsapp",
"account_id": "55-1234",
"to": "55-5678",
"body": "ok",
"msg_kind": "text",
},
"operator_token_hash": "abcdef0123456789",
});
let p: ProcessingInterventionParams = serde_json::from_value(raw).unwrap();
assert!(p.session_id.is_none());
let s = serde_json::to_string(&p).unwrap();
assert!(!s.contains("session_id"));
}
#[test]
fn resume_params_round_trip_with_session_and_summary() {
let p = ProcessingResumeParams {
scope: conversation_scope(),
operator_token_hash: "h".into(),
session_id: Some(Uuid::nil()),
summary_for_agent: Some("cliente confirmó dirección".into()),
};
let v = serde_json::to_value(&p).unwrap();
assert_eq!(v["session_id"], "00000000-0000-0000-0000-000000000000");
assert_eq!(v["summary_for_agent"], "cliente confirmó dirección");
let back: ProcessingResumeParams = serde_json::from_value(v).unwrap();
assert_eq!(back, p);
}
#[test]
fn resume_params_legacy_payload_without_new_fields_deserializes() {
let raw = serde_json::json!({
"scope": {
"kind": "conversation",
"agent_id": "ana",
"channel": "whatsapp",
"account_id": "55-1234",
"contact_id": "55-5678",
},
"operator_token_hash": "h",
});
let p: ProcessingResumeParams = serde_json::from_value(raw).unwrap();
assert!(p.session_id.is_none());
assert!(p.summary_for_agent.is_none());
let s = serde_json::to_string(&p).unwrap();
assert!(!s.contains("session_id"));
assert!(!s.contains("summary_for_agent"));
}
#[test]
fn pending_inbound_round_trip_omits_unset_message_id() {
let p = PendingInbound {
message_id: None,
from_contact_id: "wa.55".into(),
body: "hola".into(),
timestamp_ms: 1_700_000_000_000,
source_plugin: "whatsapp".into(),
};
let s = serde_json::to_string(&p).unwrap();
assert!(!s.contains("message_id"));
let back: PendingInbound = serde_json::from_str(&s).unwrap();
assert_eq!(back, p);
}
#[test]
fn ack_drained_pending_round_trip_with_value_and_absent() {
let with = ProcessingAck {
changed: true,
correlation_id: Uuid::nil(),
transcript_stamped: None,
drained_pending: Some(7),
};
let s = serde_json::to_string(&with).unwrap();
assert!(s.contains("\"drained_pending\":7"));
let back: ProcessingAck = serde_json::from_str(&s).unwrap();
assert_eq!(back, with);
let without = ProcessingAck {
changed: false,
correlation_id: Uuid::nil(),
transcript_stamped: None,
drained_pending: None,
};
let s = serde_json::to_string(&without).unwrap();
assert!(!s.contains("drained_pending"));
}
#[test]
fn ack_round_trip_with_transcript_stamped_present_and_absent() {
let with = ProcessingAck {
changed: true,
correlation_id: Uuid::nil(),
transcript_stamped: Some(true),
drained_pending: None,
};
let s = serde_json::to_string(&with).unwrap();
assert!(s.contains("transcript_stamped"));
let back: ProcessingAck = serde_json::from_str(&s).unwrap();
assert_eq!(back, with);
let without = ProcessingAck {
changed: false,
correlation_id: Uuid::nil(),
transcript_stamped: None,
drained_pending: None,
};
let s = serde_json::to_string(&without).unwrap();
assert!(!s.contains("transcript_stamped"));
let back: ProcessingAck = serde_json::from_str(&s).unwrap();
assert_eq!(back, without);
}
}