use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex};
use tracing::{debug, info, warn};
use crate::error::PluginError;
use crate::message::MessagePayload;
use crate::traits::{CancellationToken, ChannelAdapter, ChannelAdapterHost};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum VoiceStatus {
Idle,
Listening,
Transcribing,
Processing,
Speaking,
}
impl std::fmt::Display for VoiceStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Idle => write!(f, "idle"),
Self::Listening => write!(f, "listening"),
Self::Transcribing => write!(f, "transcribing"),
Self::Processing => write!(f, "processing"),
Self::Speaking => write!(f, "speaking"),
}
}
}
pub struct VoiceChannel {
status_tx: mpsc::Sender<VoiceStatus>,
status: Arc<Mutex<VoiceStatus>>,
}
impl VoiceChannel {
pub fn new() -> (Self, mpsc::Receiver<VoiceStatus>) {
let (status_tx, status_rx) = mpsc::channel(32);
let channel = Self {
status_tx,
status: Arc::new(Mutex::new(VoiceStatus::Idle)),
};
(channel, status_rx)
}
pub async fn current_status(&self) -> VoiceStatus {
*self.status.lock().await
}
async fn set_status(&self, new_status: VoiceStatus) {
let mut status = self.status.lock().await;
*status = new_status;
if let Err(e) = self.status_tx.try_send(new_status) {
debug!(
status = %new_status,
error = %e,
"Status notification dropped (receiver full or closed)"
);
}
}
}
#[async_trait]
impl ChannelAdapter for VoiceChannel {
fn name(&self) -> &str {
"voice"
}
fn display_name(&self) -> &str {
"Voice (Talk Mode)"
}
fn supports_threads(&self) -> bool {
false
}
fn supports_media(&self) -> bool {
true
}
async fn start(
&self,
_host: Arc<dyn ChannelAdapterHost>,
cancel: CancellationToken,
) -> Result<(), PluginError> {
info!("Voice channel starting (stub mode)");
self.set_status(VoiceStatus::Listening).await;
cancel.cancelled().await;
info!("Voice channel shutting down");
self.set_status(VoiceStatus::Idle).await;
Ok(())
}
async fn send(
&self,
_target: &str,
payload: &MessagePayload,
) -> Result<String, PluginError> {
let text = match payload.as_text() {
Some(t) => t,
None => {
warn!("Voice channel received non-text payload, ignoring");
return Ok("voice-skipped".into());
}
};
info!(text = %text, "Voice channel would speak via TTS (stub)");
self.set_status(VoiceStatus::Speaking).await;
self.set_status(VoiceStatus::Listening).await;
Ok(format!("voice-{}", chrono::Utc::now().timestamp_millis()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn voice_status_display() {
assert_eq!(VoiceStatus::Idle.to_string(), "idle");
assert_eq!(VoiceStatus::Listening.to_string(), "listening");
assert_eq!(VoiceStatus::Transcribing.to_string(), "transcribing");
assert_eq!(VoiceStatus::Processing.to_string(), "processing");
assert_eq!(VoiceStatus::Speaking.to_string(), "speaking");
}
#[test]
fn voice_status_serde_roundtrip() {
let statuses = vec![
VoiceStatus::Idle,
VoiceStatus::Listening,
VoiceStatus::Transcribing,
VoiceStatus::Processing,
VoiceStatus::Speaking,
];
for status in &statuses {
let json = serde_json::to_string(status).unwrap();
let restored: VoiceStatus = serde_json::from_str(&json).unwrap();
assert_eq!(&restored, status);
}
}
#[test]
fn voice_status_json_values() {
assert_eq!(
serde_json::to_string(&VoiceStatus::Idle).unwrap(),
"\"idle\""
);
assert_eq!(
serde_json::to_string(&VoiceStatus::Listening).unwrap(),
"\"listening\""
);
assert_eq!(
serde_json::to_string(&VoiceStatus::Transcribing).unwrap(),
"\"transcribing\""
);
assert_eq!(
serde_json::to_string(&VoiceStatus::Processing).unwrap(),
"\"processing\""
);
assert_eq!(
serde_json::to_string(&VoiceStatus::Speaking).unwrap(),
"\"speaking\""
);
}
#[tokio::test]
async fn voice_channel_name() {
let (channel, _rx) = VoiceChannel::new();
assert_eq!(channel.name(), "voice");
}
#[tokio::test]
async fn voice_channel_display_name() {
let (channel, _rx) = VoiceChannel::new();
assert_eq!(channel.display_name(), "Voice (Talk Mode)");
}
#[tokio::test]
async fn voice_channel_no_threads() {
let (channel, _rx) = VoiceChannel::new();
assert!(!channel.supports_threads());
}
#[tokio::test]
async fn voice_channel_supports_media() {
let (channel, _rx) = VoiceChannel::new();
assert!(channel.supports_media());
}
#[tokio::test]
async fn voice_channel_initial_status_is_idle() {
let (channel, _rx) = VoiceChannel::new();
assert_eq!(channel.current_status().await, VoiceStatus::Idle);
}
#[tokio::test]
async fn voice_channel_send_with_text() {
let (channel, mut rx) = VoiceChannel::new();
let payload = MessagePayload::text("Hello from the agent");
let msg_id = channel.send("user", &payload).await.unwrap();
assert!(msg_id.starts_with("voice-"));
let s1 = rx.recv().await.unwrap();
assert_eq!(s1, VoiceStatus::Speaking);
let s2 = rx.recv().await.unwrap();
assert_eq!(s2, VoiceStatus::Listening);
}
#[tokio::test]
async fn voice_channel_send_with_non_text_returns_skipped() {
let (channel, _rx) = VoiceChannel::new();
let payload = MessagePayload::structured(serde_json::json!({"key": "val"}));
let msg_id = channel.send("user", &payload).await.unwrap();
assert_eq!(msg_id, "voice-skipped");
}
#[tokio::test]
async fn voice_channel_start_and_cancel() {
use std::collections::HashMap;
struct StubHost;
#[async_trait]
impl ChannelAdapterHost for StubHost {
async fn deliver_inbound(
&self,
_channel: &str,
_sender_id: &str,
_chat_id: &str,
_payload: MessagePayload,
_metadata: HashMap<String, serde_json::Value>,
) -> Result<(), PluginError> {
Ok(())
}
}
let (channel, mut rx) = VoiceChannel::new();
let channel = Arc::new(channel);
let host: Arc<dyn ChannelAdapterHost> = Arc::new(StubHost);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = tokio::spawn({
let channel = Arc::clone(&channel);
async move { channel.start(host, cancel_clone).await }
});
let status = rx.recv().await.unwrap();
assert_eq!(status, VoiceStatus::Listening);
cancel.cancel();
let result = handle.await.unwrap();
assert!(result.is_ok());
let status = rx.recv().await.unwrap();
assert_eq!(status, VoiceStatus::Idle);
}
}