Skip to main content

clawft_plugin/voice/
channel.rs

1//! Voice channel adapter implementing [`ChannelAdapter`].
2//!
3//! Provides a `VoiceChannel` that bridges the voice pipeline
4//! (capture -> VAD -> STT -> agent -> TTS -> playback) as a
5//! channel adapter. Currently a stub implementation -- real
6//! audio processing deferred until sherpa-rs/cpal VP completes.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use tokio::sync::{mpsc, Mutex};
12use tracing::{debug, info, warn};
13
14use crate::error::PluginError;
15use crate::message::MessagePayload;
16use crate::traits::{CancellationToken, ChannelAdapter, ChannelAdapterHost};
17
18/// Voice channel status for WebSocket reporting.
19#[non_exhaustive]
20#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum VoiceStatus {
23    /// Channel is idle, not actively listening.
24    Idle,
25    /// Listening for speech via VAD.
26    Listening,
27    /// Transcribing detected speech via STT.
28    Transcribing,
29    /// Processing transcribed text through the agent pipeline.
30    Processing,
31    /// Speaking agent response via TTS.
32    Speaking,
33}
34
35impl std::fmt::Display for VoiceStatus {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            Self::Idle => write!(f, "idle"),
39            Self::Listening => write!(f, "listening"),
40            Self::Transcribing => write!(f, "transcribing"),
41            Self::Processing => write!(f, "processing"),
42            Self::Speaking => write!(f, "speaking"),
43        }
44    }
45}
46
47/// Voice channel adapter for continuous voice conversation.
48///
49/// Implements [`ChannelAdapter`] to integrate the voice pipeline
50/// with the agent system. Status transitions are reported via an
51/// `mpsc` channel for WebSocket broadcasting.
52///
53/// # Stub Behavior
54///
55/// This implementation is a stub. The `start()` method waits for
56/// cancellation rather than processing real audio. The `send()`
57/// method logs the outbound text that would be spoken via TTS.
58pub struct VoiceChannel {
59    status_tx: mpsc::Sender<VoiceStatus>,
60    status: Arc<Mutex<VoiceStatus>>,
61}
62
63impl VoiceChannel {
64    /// Create a new voice channel.
65    ///
66    /// Returns the channel and a receiver for status updates.
67    /// The receiver can be used to broadcast status changes
68    /// to WebSocket clients.
69    pub fn new() -> (Self, mpsc::Receiver<VoiceStatus>) {
70        let (status_tx, status_rx) = mpsc::channel(32);
71        let channel = Self {
72            status_tx,
73            status: Arc::new(Mutex::new(VoiceStatus::Idle)),
74        };
75        (channel, status_rx)
76    }
77
78    /// Get the current voice status.
79    pub async fn current_status(&self) -> VoiceStatus {
80        *self.status.lock().await
81    }
82
83    /// Update the voice status and notify listeners.
84    async fn set_status(&self, new_status: VoiceStatus) {
85        let mut status = self.status.lock().await;
86        *status = new_status;
87        // Best-effort send -- if the receiver is dropped, we just log.
88        if let Err(e) = self.status_tx.try_send(new_status) {
89            debug!(
90                status = %new_status,
91                error = %e,
92                "Status notification dropped (receiver full or closed)"
93            );
94        }
95    }
96}
97
98#[async_trait]
99impl ChannelAdapter for VoiceChannel {
100    fn name(&self) -> &str {
101        "voice"
102    }
103
104    fn display_name(&self) -> &str {
105        "Voice (Talk Mode)"
106    }
107
108    fn supports_threads(&self) -> bool {
109        false
110    }
111
112    fn supports_media(&self) -> bool {
113        true
114    }
115
116    /// Start the voice channel loop.
117    ///
118    /// In the real implementation this would:
119    /// 1. Start audio capture
120    /// 2. Run VAD to detect speech segments
121    /// 3. Feed speech to STT for transcription
122    /// 4. Deliver transcribed text to the agent pipeline via `host`
123    /// 5. Loop until cancelled
124    ///
125    /// The stub simply sets status to Listening and waits for cancellation.
126    async fn start(
127        &self,
128        _host: Arc<dyn ChannelAdapterHost>,
129        cancel: CancellationToken,
130    ) -> Result<(), PluginError> {
131        info!("Voice channel starting (stub mode)");
132        self.set_status(VoiceStatus::Listening).await;
133
134        // Stub: wait for cancellation.
135        // In the real implementation, this loop would poll audio
136        // capture, run VAD, and deliver transcribed utterances.
137        cancel.cancelled().await;
138
139        info!("Voice channel shutting down");
140        self.set_status(VoiceStatus::Idle).await;
141        Ok(())
142    }
143
144    /// Send a message through the voice channel (TTS output).
145    ///
146    /// In the real implementation this would synthesize the text
147    /// via TTS and play it through the speaker. The stub logs the
148    /// text and transitions status: Speaking -> Listening.
149    async fn send(
150        &self,
151        _target: &str,
152        payload: &MessagePayload,
153    ) -> Result<String, PluginError> {
154        let text = match payload.as_text() {
155            Some(t) => t,
156            None => {
157                warn!("Voice channel received non-text payload, ignoring");
158                return Ok("voice-skipped".into());
159            }
160        };
161
162        info!(text = %text, "Voice channel would speak via TTS (stub)");
163        self.set_status(VoiceStatus::Speaking).await;
164
165        // Stub: in real implementation, TTS synthesis and playback happen here.
166        // For now, just transition back to listening.
167        self.set_status(VoiceStatus::Listening).await;
168
169        Ok(format!("voice-{}", chrono::Utc::now().timestamp_millis()))
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn voice_status_display() {
179        assert_eq!(VoiceStatus::Idle.to_string(), "idle");
180        assert_eq!(VoiceStatus::Listening.to_string(), "listening");
181        assert_eq!(VoiceStatus::Transcribing.to_string(), "transcribing");
182        assert_eq!(VoiceStatus::Processing.to_string(), "processing");
183        assert_eq!(VoiceStatus::Speaking.to_string(), "speaking");
184    }
185
186    #[test]
187    fn voice_status_serde_roundtrip() {
188        let statuses = vec![
189            VoiceStatus::Idle,
190            VoiceStatus::Listening,
191            VoiceStatus::Transcribing,
192            VoiceStatus::Processing,
193            VoiceStatus::Speaking,
194        ];
195        for status in &statuses {
196            let json = serde_json::to_string(status).unwrap();
197            let restored: VoiceStatus = serde_json::from_str(&json).unwrap();
198            assert_eq!(&restored, status);
199        }
200    }
201
202    #[test]
203    fn voice_status_json_values() {
204        assert_eq!(
205            serde_json::to_string(&VoiceStatus::Idle).unwrap(),
206            "\"idle\""
207        );
208        assert_eq!(
209            serde_json::to_string(&VoiceStatus::Listening).unwrap(),
210            "\"listening\""
211        );
212        assert_eq!(
213            serde_json::to_string(&VoiceStatus::Transcribing).unwrap(),
214            "\"transcribing\""
215        );
216        assert_eq!(
217            serde_json::to_string(&VoiceStatus::Processing).unwrap(),
218            "\"processing\""
219        );
220        assert_eq!(
221            serde_json::to_string(&VoiceStatus::Speaking).unwrap(),
222            "\"speaking\""
223        );
224    }
225
226    #[tokio::test]
227    async fn voice_channel_name() {
228        let (channel, _rx) = VoiceChannel::new();
229        assert_eq!(channel.name(), "voice");
230    }
231
232    #[tokio::test]
233    async fn voice_channel_display_name() {
234        let (channel, _rx) = VoiceChannel::new();
235        assert_eq!(channel.display_name(), "Voice (Talk Mode)");
236    }
237
238    #[tokio::test]
239    async fn voice_channel_no_threads() {
240        let (channel, _rx) = VoiceChannel::new();
241        assert!(!channel.supports_threads());
242    }
243
244    #[tokio::test]
245    async fn voice_channel_supports_media() {
246        let (channel, _rx) = VoiceChannel::new();
247        assert!(channel.supports_media());
248    }
249
250    #[tokio::test]
251    async fn voice_channel_initial_status_is_idle() {
252        let (channel, _rx) = VoiceChannel::new();
253        assert_eq!(channel.current_status().await, VoiceStatus::Idle);
254    }
255
256    #[tokio::test]
257    async fn voice_channel_send_with_text() {
258        let (channel, mut rx) = VoiceChannel::new();
259        let payload = MessagePayload::text("Hello from the agent");
260        let msg_id = channel.send("user", &payload).await.unwrap();
261        assert!(msg_id.starts_with("voice-"));
262
263        // Should have received Speaking then Listening status updates.
264        let s1 = rx.recv().await.unwrap();
265        assert_eq!(s1, VoiceStatus::Speaking);
266        let s2 = rx.recv().await.unwrap();
267        assert_eq!(s2, VoiceStatus::Listening);
268    }
269
270    #[tokio::test]
271    async fn voice_channel_send_with_non_text_returns_skipped() {
272        let (channel, _rx) = VoiceChannel::new();
273        let payload = MessagePayload::structured(serde_json::json!({"key": "val"}));
274        let msg_id = channel.send("user", &payload).await.unwrap();
275        assert_eq!(msg_id, "voice-skipped");
276    }
277
278    #[tokio::test]
279    async fn voice_channel_start_and_cancel() {
280        use std::collections::HashMap;
281
282        struct StubHost;
283        #[async_trait]
284        impl ChannelAdapterHost for StubHost {
285            async fn deliver_inbound(
286                &self,
287                _channel: &str,
288                _sender_id: &str,
289                _chat_id: &str,
290                _payload: MessagePayload,
291                _metadata: HashMap<String, serde_json::Value>,
292            ) -> Result<(), PluginError> {
293                Ok(())
294            }
295        }
296
297        let (channel, mut rx) = VoiceChannel::new();
298        let channel = Arc::new(channel);
299        let host: Arc<dyn ChannelAdapterHost> = Arc::new(StubHost);
300        let cancel = CancellationToken::new();
301        let cancel_clone = cancel.clone();
302
303        let handle = tokio::spawn({
304            let channel = Arc::clone(&channel);
305            async move { channel.start(host, cancel_clone).await }
306        });
307
308        // Wait for the Listening status.
309        let status = rx.recv().await.unwrap();
310        assert_eq!(status, VoiceStatus::Listening);
311
312        // Cancel and wait for shutdown.
313        cancel.cancel();
314        let result = handle.await.unwrap();
315        assert!(result.is_ok());
316
317        // Should have received Idle status on shutdown.
318        let status = rx.recv().await.unwrap();
319        assert_eq!(status, VoiceStatus::Idle);
320    }
321}