use serde::Serialize;
use serde_json::{json, Value};
use nexo_tool_meta::admin::processing::ProcessingScope;
use uuid::Uuid;
use super::{AdminClient, AdminError};
#[derive(Debug, Clone, Serialize)]
pub struct SendReplyArgs {
pub body: String,
pub msg_kind: String,
#[serde(default)]
pub attachments: Vec<Value>,
#[serde(default)]
pub reply_to_msg_id: Option<String>,
#[serde(default)]
pub session_id: Option<Uuid>,
}
impl SendReplyArgs {
pub fn text(body: impl Into<String>) -> Self {
Self {
body: body.into(),
msg_kind: "text".into(),
attachments: Vec::new(),
reply_to_msg_id: None,
session_id: None,
}
}
pub fn with_session(mut self, session_id: Uuid) -> Self {
self.session_id = Some(session_id);
self
}
}
#[derive(Debug)]
pub struct HumanTakeover<'a> {
admin: &'a AdminClient,
scope: ProcessingScope,
operator_token_hash: String,
session_id: Option<Uuid>,
}
impl<'a> HumanTakeover<'a> {
pub async fn engage(
admin: &'a AdminClient,
scope: ProcessingScope,
operator_token_hash: impl Into<String>,
reason: Option<String>,
) -> Result<HumanTakeover<'a>, AdminError> {
let operator_token_hash = operator_token_hash.into();
let mut params = json!({
"scope": scope,
"operator_token_hash": operator_token_hash.clone(),
});
if let Some(r) = reason {
params["reason"] = json!(r);
}
let _ack: Value = admin.call("nexo/admin/processing/pause", params).await?;
Ok(HumanTakeover {
admin,
scope,
operator_token_hash,
session_id: None,
})
}
pub fn with_session(mut self, session_id: Uuid) -> Self {
self.session_id = Some(session_id);
self
}
pub async fn send_reply(
&self,
channel: impl Into<String>,
account_id: impl Into<String>,
to: impl Into<String>,
args: SendReplyArgs,
) -> Result<Value, AdminError> {
let mut params = json!({
"scope": self.scope,
"operator_token_hash": self.operator_token_hash,
"action": {
"kind": "reply",
"channel": channel.into(),
"account_id": account_id.into(),
"to": to.into(),
"body": args.body,
"msg_kind": args.msg_kind,
"attachments": args.attachments,
"reply_to_msg_id": args.reply_to_msg_id,
},
});
let effective_session = args.session_id.or(self.session_id);
if let Some(sid) = effective_session {
params["session_id"] = json!(sid);
}
self.admin
.call("nexo/admin/processing/intervention", params)
.await
}
pub async fn release(self, summary_for_agent: Option<String>) -> Result<(), AdminError> {
let mut params = json!({
"scope": self.scope,
"operator_token_hash": self.operator_token_hash,
});
if let Some(sid) = self.session_id {
params["session_id"] = json!(sid);
}
if let Some(summary) = summary_for_agent {
params["summary_for_agent"] = json!(summary);
}
let _: Value = self
.admin
.call("nexo/admin/processing/resume", params)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::admin::AdminSender;
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Default)]
struct ScriptedSender {
sent: Mutex<Vec<Value>>,
}
#[async_trait]
impl AdminSender for ScriptedSender {
async fn send_line(&self, line: String) -> Result<(), AdminError> {
let v: Value = serde_json::from_str(&line).unwrap();
self.sent.lock().unwrap().push(v);
Ok(())
}
}
fn convo() -> ProcessingScope {
ProcessingScope::Conversation {
agent_id: "ana".into(),
channel: "whatsapp".into(),
account_id: "wa.0".into(),
contact_id: "wa.55".into(),
mcp_channel_source: None,
}
}
async fn run_with_responses(
admin: AdminClient,
sender: Arc<ScriptedSender>,
responder: impl Fn(&Value) -> Value + Send + 'static,
future: impl std::future::Future<Output = Result<(), AdminError>> + Send + 'static,
) -> Result<(), AdminError> {
let admin_for_pump = admin.clone();
let pump = tokio::spawn(async move {
for _ in 0..200 {
tokio::time::sleep(Duration::from_millis(5)).await;
let to_handle: Vec<Value> = {
let mut sent = sender.sent.lock().unwrap();
std::mem::take(&mut *sent)
};
for frame in to_handle {
let id = frame["id"].as_str().unwrap().to_string();
let response = responder(&frame);
let resp = json!({
"jsonrpc": "2.0",
"id": id,
"result": response,
});
admin_for_pump.on_inbound_response(&id, &resp);
}
if admin_for_pump.pending_len() == 0 {
tokio::time::sleep(Duration::from_millis(5)).await;
if admin_for_pump.pending_len() == 0 {
return;
}
}
}
});
let result = future.await;
pump.abort();
let _ = pump.await;
result
}
#[tokio::test]
async fn engage_send_release_invokes_pause_intervention_resume_in_order() {
let sender = Arc::new(ScriptedSender::default());
let admin = AdminClient::with_timeout(sender.clone(), Duration::from_secs(2));
let admin_for_calls = admin.clone();
let captured: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured.clone();
let task = async move {
let takeover =
HumanTakeover::engage(&admin_for_calls, convo(), "h0", Some("operator".into()))
.await?;
takeover
.send_reply("whatsapp", "wa.0", "wa.55", SendReplyArgs::text("hello"))
.await?;
takeover.release(None).await
};
let responder = move |frame: &Value| {
captured_clone
.lock()
.unwrap()
.push(frame["method"].as_str().unwrap().to_string());
json!({ "changed": true, "correlation_id": "00000000-0000-0000-0000-000000000000" })
};
let result = run_with_responses(admin, sender, responder, task).await;
result.expect("takeover round trip");
let methods = captured.lock().unwrap().clone();
assert_eq!(
methods,
vec![
"nexo/admin/processing/pause".to_string(),
"nexo/admin/processing/intervention".to_string(),
"nexo/admin/processing/resume".to_string(),
]
);
}
#[tokio::test]
async fn send_reply_args_text_helper_defaults_to_text_kind() {
let a = SendReplyArgs::text("hi");
assert_eq!(a.msg_kind, "text");
assert!(a.attachments.is_empty());
assert!(a.reply_to_msg_id.is_none());
assert!(a.session_id.is_none());
}
#[tokio::test]
async fn send_reply_with_session_threads_session_id_onto_wire() {
let sender = Arc::new(ScriptedSender::default());
let admin = AdminClient::with_timeout(sender.clone(), Duration::from_secs(2));
let admin_for_calls = admin.clone();
let captured: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured.clone();
let session_id = Uuid::parse_str("33333333-3333-4333-8333-333333333333").unwrap();
let task = async move {
let takeover = HumanTakeover::engage(&admin_for_calls, convo(), "h0", None).await?;
takeover
.send_reply(
"whatsapp",
"wa.0",
"wa.55",
SendReplyArgs::text("hello").with_session(session_id),
)
.await?;
Ok(())
};
let responder = move |frame: &Value| {
captured_clone.lock().unwrap().push(frame.clone());
json!({
"changed": true,
"correlation_id": "00000000-0000-0000-0000-000000000000",
"transcript_stamped": true,
})
};
let result = run_with_responses(admin, sender, responder, task).await;
result.expect("send_reply with session round trip");
let frames = captured.lock().unwrap().clone();
let intervention = frames
.iter()
.find(|f| f["method"] == "nexo/admin/processing/intervention")
.expect("intervention frame present");
assert_eq!(
intervention["params"]["session_id"],
json!(session_id.to_string())
);
}
#[tokio::test]
async fn release_with_session_and_summary_threads_both_onto_wire() {
let sender = Arc::new(ScriptedSender::default());
let admin = AdminClient::with_timeout(sender.clone(), Duration::from_secs(2));
let admin_for_calls = admin.clone();
let captured: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured.clone();
let session_id = Uuid::parse_str("44444444-4444-4444-8444-444444444444").unwrap();
let task = async move {
let takeover = HumanTakeover::engage(&admin_for_calls, convo(), "h0", None)
.await?
.with_session(session_id);
takeover
.release(Some("cliente confirmó dirección".into()))
.await
};
let responder = move |frame: &Value| {
captured_clone.lock().unwrap().push(frame.clone());
json!({
"changed": true,
"correlation_id": "00000000-0000-0000-0000-000000000000",
"transcript_stamped": true,
})
};
run_with_responses(admin, sender, responder, task)
.await
.expect("release with summary round trip");
let frames = captured.lock().unwrap().clone();
let resume = frames
.iter()
.find(|f| f["method"] == "nexo/admin/processing/resume")
.expect("resume frame present");
assert_eq!(
resume["params"]["session_id"],
json!(session_id.to_string())
);
assert_eq!(
resume["params"]["summary_for_agent"],
json!("cliente confirmó dirección")
);
}
#[tokio::test]
async fn release_without_summary_omits_field_on_wire() {
let sender = Arc::new(ScriptedSender::default());
let admin = AdminClient::with_timeout(sender.clone(), Duration::from_secs(2));
let admin_for_calls = admin.clone();
let captured: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured.clone();
let session_id = Uuid::parse_str("55555555-5555-4555-8555-555555555555").unwrap();
let task = async move {
let takeover = HumanTakeover::engage(&admin_for_calls, convo(), "h0", None)
.await?
.with_session(session_id);
takeover.release(None).await
};
let responder = move |frame: &Value| {
captured_clone.lock().unwrap().push(frame.clone());
json!({ "changed": true, "correlation_id": "00000000-0000-0000-0000-000000000000" })
};
run_with_responses(admin, sender, responder, task)
.await
.expect("release no summary round trip");
let frames = captured.lock().unwrap().clone();
let resume = frames
.iter()
.find(|f| f["method"] == "nexo/admin/processing/resume")
.expect("resume frame present");
assert_eq!(
resume["params"]["session_id"],
json!(session_id.to_string())
);
assert!(resume["params"].get("summary_for_agent").is_none());
}
#[tokio::test]
async fn send_reply_without_session_skips_field_on_wire() {
let sender = Arc::new(ScriptedSender::default());
let admin = AdminClient::with_timeout(sender.clone(), Duration::from_secs(2));
let admin_for_calls = admin.clone();
let captured: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured.clone();
let task = async move {
let takeover = HumanTakeover::engage(&admin_for_calls, convo(), "h0", None).await?;
takeover
.send_reply("whatsapp", "wa.0", "wa.55", SendReplyArgs::text("hi"))
.await?;
Ok(())
};
let responder = move |frame: &Value| {
captured_clone.lock().unwrap().push(frame.clone());
json!({ "changed": true, "correlation_id": "00000000-0000-0000-0000-000000000000" })
};
run_with_responses(admin, sender, responder, task)
.await
.expect("round trip");
let frames = captured.lock().unwrap().clone();
let intervention = frames
.iter()
.find(|f| f["method"] == "nexo/admin/processing/intervention")
.expect("intervention frame present");
assert!(intervention["params"].get("session_id").is_none());
}
}