use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use log;
use crate::context::LLMContext;
use crate::error::Result;
use crate::frames::{
DataFrame, Frame, FrameDirection, FrameHandler, FrameInner, FrameProcessor, SystemFrame,
};
struct State {
aggregation: String,
user_speaking: bool,
}
pub struct LLMUserAggregator {
context: Arc<Mutex<LLMContext>>,
state: Mutex<State>,
}
impl LLMUserAggregator {
pub fn new(context: Arc<Mutex<LLMContext>>) -> FrameProcessor {
FrameProcessor::new(
"LLMUserAggregator",
Box::new(Self {
context,
state: Mutex::new(State {
aggregation: String::new(),
user_speaking: false,
}),
}),
false,
)
}
async fn flush_and_trigger(&self, processor: &FrameProcessor) -> Result<bool> {
let aggregation = {
let mut state = self.state.lock().unwrap();
let text = state.aggregation.trim().to_string();
state.aggregation.clear();
text
};
if aggregation.is_empty() {
log::debug!("LLMUserAggregator: empty turn, skipping LLM trigger");
return Ok(false);
}
log::info!("LLMUserAggregator: user said: '{}'", aggregation);
self.context.lock().unwrap().add_user_message(&aggregation);
processor
.push_frame(
Frame::llm_context(self.context.clone()),
FrameDirection::Downstream,
)
.await?;
Ok(true)
}
}
#[async_trait]
impl FrameHandler for LLMUserAggregator {
async fn on_process_frame(
&self,
processor: &FrameProcessor,
frame: Frame,
direction: FrameDirection,
) -> Result<()> {
match &frame.inner {
FrameInner::System(SystemFrame::VADUserStartedSpeaking { .. }) => {
self.state.lock().unwrap().user_speaking = true;
processor.push_frame(frame, direction).await?;
if processor.interruptions_allowed() {
processor.broadcast_interruption().await?;
}
}
FrameInner::System(SystemFrame::VADUserStoppedSpeaking { .. }) => {
self.state.lock().unwrap().user_speaking = false;
processor.push_frame(frame, direction).await?;
self.flush_and_trigger(processor).await?;
}
FrameInner::Data(DataFrame::Transcription(t)) => {
let text = t.text.trim().to_string();
if text.is_empty() {
return Ok(());
}
let should_trigger = {
let mut state = self.state.lock().unwrap();
if state.aggregation.is_empty() {
state.aggregation = text;
} else {
state.aggregation.push(' ');
state.aggregation.push_str(&text);
}
!state.user_speaking
};
if should_trigger {
self.flush_and_trigger(processor).await?;
}
}
_ => {
processor.push_frame(frame, direction).await?;
}
}
Ok(())
}
}