mod agent;
pub(crate) mod follow_up_question;
pub(crate) mod handler;
pub(crate) mod store;
pub use agent::WhatsAppAgent;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
type PendingWhatsAppQuestion = (tokio::sync::oneshot::Sender<String>, Vec<String>);
use whatsapp_rust::client::Client;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaApproval {
Yes,
Always,
Yolo,
No,
}
pub struct WhatsAppState {
client: Mutex<Option<Arc<Client>>>,
owner_jid: Mutex<Option<String>>,
pub pending_approvals: Mutex<HashMap<String, tokio::sync::oneshot::Sender<WaApproval>>>,
pub pending_questions: Mutex<HashMap<String, PendingWhatsAppQuestion>>,
cancel_tokens: Mutex<HashMap<Uuid, CancellationToken>>,
qr_tx: tokio::sync::broadcast::Sender<String>,
connected_tx: tokio::sync::broadcast::Sender<()>,
error_tx: tokio::sync::broadcast::Sender<String>,
#[allow(clippy::type_complexity)]
photo_buffer: Mutex<HashMap<String, Vec<(String, Option<String>)>>>,
pub(crate) photo_debounce: Mutex<HashMap<String, CancellationToken>>,
}
impl Default for WhatsAppState {
fn default() -> Self {
Self::new()
}
}
impl WhatsAppState {
pub fn new() -> Self {
let (qr_tx, _) = tokio::sync::broadcast::channel(8);
let (connected_tx, _) = tokio::sync::broadcast::channel(4);
let (error_tx, _) = tokio::sync::broadcast::channel(4);
Self {
client: Mutex::new(None),
owner_jid: Mutex::new(None),
pending_approvals: Mutex::new(HashMap::new()),
pending_questions: Mutex::new(HashMap::new()),
cancel_tokens: Mutex::new(HashMap::new()),
qr_tx,
connected_tx,
error_tx,
photo_buffer: Mutex::new(HashMap::new()),
photo_debounce: Mutex::new(HashMap::new()),
}
}
pub async fn register_pending_approval(
&self,
phone: String,
tx: tokio::sync::oneshot::Sender<WaApproval>,
) {
self.pending_approvals.lock().await.insert(phone, tx);
}
pub async fn resolve_pending_approval(
&self,
phone: &str,
choice: WaApproval,
) -> Option<WaApproval> {
if let Some(tx) = self.pending_approvals.lock().await.remove(phone) {
let _ = tx.send(choice);
Some(choice)
} else {
None
}
}
pub async fn register_pending_question(
&self,
phone: String,
tx: tokio::sync::oneshot::Sender<String>,
options: Vec<String>,
) {
self.pending_questions
.lock()
.await
.insert(phone, (tx, options));
}
pub async fn resolve_pending_question(&self, phone: &str, reply: &str) -> Option<String> {
let parsed: usize = reply.trim().parse().ok()?;
if parsed == 0 {
return None;
}
let idx = parsed - 1;
let (tx, options) = self.pending_questions.lock().await.remove(phone)?;
let answer = options.get(idx)?.clone();
let _ = tx.send(answer.clone());
Some(answer)
}
pub async fn has_pending_question(&self, phone: &str) -> bool {
self.pending_questions.lock().await.contains_key(phone)
}
pub fn broadcast_qr(&self, code: &str) {
let _ = self.qr_tx.send(code.to_string());
}
pub fn broadcast_connected(&self) {
let _ = self.connected_tx.send(());
}
pub fn subscribe_qr(&self) -> tokio::sync::broadcast::Receiver<String> {
self.qr_tx.subscribe()
}
pub fn subscribe_connected(&self) -> tokio::sync::broadcast::Receiver<()> {
self.connected_tx.subscribe()
}
pub fn broadcast_error(&self, msg: &str) {
let _ = self.error_tx.send(msg.to_string());
}
pub fn subscribe_error(&self) -> tokio::sync::broadcast::Receiver<String> {
self.error_tx.subscribe()
}
pub async fn set_connected(&self, client: Arc<Client>, owner_jid: Option<String>) {
*self.client.lock().await = Some(client);
if let Some(jid) = owner_jid {
*self.owner_jid.lock().await = Some(jid);
}
self.broadcast_connected();
}
pub async fn client(&self) -> Option<Arc<Client>> {
self.client.lock().await.clone()
}
pub async fn owner_jid(&self) -> Option<String> {
self.owner_jid.lock().await.clone()
}
pub async fn is_connected(&self) -> bool {
self.client.lock().await.is_some()
}
pub async fn store_cancel_token(&self, session_id: Uuid, token: CancellationToken) {
let mut tokens = self.cancel_tokens.lock().await;
if let Some(old) = tokens.remove(&session_id) {
tracing::warn!(
"WhatsApp: cancelling previous in-flight agent call for session {}",
session_id
);
old.cancel();
}
tokens.insert(session_id, token);
}
pub async fn cancel_session(&self, session_id: Uuid) -> bool {
if let Some(token) = self.cancel_tokens.lock().await.remove(&session_id) {
token.cancel();
true
} else {
false
}
}
pub async fn remove_cancel_token(&self, session_id: Uuid) {
let mut tokens = self.cancel_tokens.lock().await;
if let Some(token) = tokens.get(&session_id)
&& token.is_cancelled()
{
tokens.remove(&session_id);
}
}
pub async fn buffer_photo(
&self,
chat_jid: &str,
img_marker: String,
caption: Option<String>,
) -> usize {
let mut buffer = self.photo_buffer.lock().await;
let entry = buffer.entry(chat_jid.to_string()).or_default();
entry.push((img_marker, caption));
entry.len()
}
pub async fn drain_photo_buffer(&self, chat_jid: &str) -> (Vec<String>, Option<String>) {
let mut buffer = self.photo_buffer.lock().await;
if let Some(entries) = buffer.remove(chat_jid) {
let caption = entries
.iter()
.find_map(|(_, c)| c.as_ref().filter(|s| !s.trim().is_empty()).cloned());
let markers: Vec<String> = entries.into_iter().map(|(m, _)| m).collect();
(markers, caption)
} else {
(Vec::new(), None)
}
}
pub async fn reset_photo_debounce(&self, chat_jid: &str) -> CancellationToken {
let mut debounce = self.photo_debounce.lock().await;
if let Some(old_token) = debounce.remove(chat_jid) {
old_token.cancel();
}
let token = CancellationToken::new();
debounce.insert(chat_jid.to_string(), token.clone());
token
}
pub async fn wait_photo_debounce(&self, token: &CancellationToken) -> bool {
tokio::select! {
_ = token.cancelled() => false,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => true,
}
}
pub async fn cleanup_photo_debounce(&self, chat_jid: &str) {
let mut debounce = self.photo_debounce.lock().await;
debounce.remove(chat_jid);
}
}