use zeph_llm::provider::Role;
use super::Agent;
use super::context;
use super::error;
use super::feedback_detector;
pub(super) fn feedback_verdict_into_signal(
verdict: &zeph_llm::classifier::llm::FeedbackVerdict,
user_message: &str,
) -> Option<feedback_detector::CorrectionSignal> {
if !verdict.is_correction {
return None;
}
let confidence = verdict.confidence.clamp(0.0, 1.0);
let kind_raw = verdict.kind.trim().to_lowercase().replace(' ', "_");
let kind = match kind_raw.as_str() {
"explicit_rejection" => feedback_detector::CorrectionKind::ExplicitRejection,
"alternative_request" => feedback_detector::CorrectionKind::AlternativeRequest,
"repetition" => feedback_detector::CorrectionKind::Repetition,
"self_correction" => feedback_detector::CorrectionKind::SelfCorrection,
other => {
tracing::warn!(
kind = other,
"llm-classifier returned unknown correction kind, discarding"
);
return None;
}
};
Some(feedback_detector::CorrectionSignal {
confidence,
kind,
feedback_text: user_message.to_owned(),
})
}
pub(super) async fn store_correction_in_memory(
memory: Option<std::sync::Arc<zeph_memory::semantic::SemanticMemory>>,
conv_id: Option<zeph_memory::ConversationId>,
assistant_snippet: &str,
user_msg: &str,
skill_name: String,
kind_str: &str,
) {
let Some(mem) = memory else { return };
let correction_text = context::truncate_chars(user_msg, 500);
match mem
.sqlite()
.store_user_correction(
conv_id.map(|c| c.0),
assistant_snippet,
&correction_text,
if skill_name.is_empty() {
None
} else {
Some(skill_name.as_str())
},
kind_str,
)
.await
{
Ok(correction_id) => {
if let Err(e) = mem
.store_correction_embedding(correction_id, &correction_text)
.await
{
tracing::warn!("failed to store correction embedding: {e:#}");
}
}
Err(e) => {
tracing::warn!("failed to store judge correction: {e:#}");
}
}
}
impl<C: crate::channel::Channel> Agent<C> {
#[allow(clippy::too_many_lines)]
pub(super) fn spawn_judge_correction_check(
&mut self,
trimmed: &str,
conv_id: Option<zeph_memory::ConversationId>,
) {
let assistant_snippet = self.last_assistant_response();
let user_msg_owned = trimmed.to_owned();
let memory_arc = self.memory_state.memory.clone();
let skill_name = self
.skill_state
.active_skill_names
.first()
.cloned()
.unwrap_or_default();
let conv_id_bg = conv_id;
let confidence_threshold = self
.learning_engine
.config
.as_ref()
.map_or(0.6, |c| c.correction_confidence_threshold);
if let Some(llm_classifier) = self.feedback.llm_classifier.clone() {
let user_msg = user_msg_owned.clone();
let assistant = assistant_snippet.clone();
let memory_arc2 = memory_arc.clone();
let skill_name2 = skill_name.clone();
let classifier_metrics_bg = self.metrics.classifier_metrics.clone();
let metrics_tx_bg = self.metrics.metrics_tx.clone();
tokio::spawn(async move {
match llm_classifier
.classify_feedback(&user_msg, &assistant, confidence_threshold)
.await
{
Ok(verdict) => {
if let (Some(ref cm), Some(ref tx)) = (classifier_metrics_bg, metrics_tx_bg)
{
let snap = cm.snapshot();
tx.send_modify(|ms| ms.classifier = snap);
}
if let Some(signal) = feedback_verdict_into_signal(&verdict, &user_msg) {
let is_self_correction =
signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
tracing::info!(
kind = signal.kind.as_str(),
confidence = signal.confidence,
source = "llm-classifier",
is_self_correction,
"correction signal detected"
);
store_correction_in_memory(
memory_arc2,
conv_id_bg,
&assistant,
&user_msg,
skill_name2,
signal.kind.as_str(),
)
.await;
}
}
Err(e) => {
tracing::warn!("llm-classifier failed: {e:#}");
}
}
});
} else {
let judge_provider = self
.providers
.judge_provider
.clone()
.unwrap_or_else(|| self.provider.clone());
let user_msg = user_msg_owned.clone();
let assistant = assistant_snippet.clone();
tokio::spawn(async move {
match feedback_detector::JudgeDetector::evaluate(
&judge_provider,
&user_msg,
&assistant,
confidence_threshold,
)
.await
{
Ok(verdict) => {
if let Some(signal) = verdict.into_signal(&user_msg) {
let is_self_correction =
signal.kind == feedback_detector::CorrectionKind::SelfCorrection;
tracing::info!(
kind = signal.kind.as_str(),
confidence = signal.confidence,
source = "judge",
is_self_correction,
"correction signal detected"
);
store_correction_in_memory(
memory_arc,
conv_id_bg,
&assistant,
&user_msg,
skill_name,
signal.kind.as_str(),
)
.await;
}
}
Err(e) => {
tracing::warn!("judge detector failed: {e:#}");
}
}
});
}
}
#[allow(clippy::too_many_lines)]
pub(super) async fn detect_and_record_corrections(
&mut self,
trimmed: &str,
conv_id: Option<zeph_memory::ConversationId>,
) {
let correction_detection_enabled = self
.learning_engine
.config
.as_ref()
.is_none_or(|c| c.correction_detection);
if !correction_detection_enabled {
return;
}
let previous_user_messages: Vec<&str> = self
.msg
.messages
.iter()
.filter(|m| m.role == Role::User)
.map(|m| m.content.as_str())
.collect();
let regex_signal = self
.feedback
.detector
.detect(trimmed, &previous_user_messages);
let judge_should_run = if self.feedback.llm_classifier.is_some() {
let adaptive_low = self
.learning_engine
.config
.as_ref()
.map_or(0.5, |c| c.judge_adaptive_low);
let adaptive_high = self
.learning_engine
.config
.as_ref()
.map_or(0.8, |c| c.judge_adaptive_high);
let should_invoke = self
.feedback
.judge
.get_or_insert_with(|| {
feedback_detector::JudgeDetector::new(adaptive_low, adaptive_high)
})
.should_invoke(regex_signal.as_ref());
should_invoke
&& self
.feedback
.judge
.as_mut()
.is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
} else {
self.feedback
.judge
.as_ref()
.is_some_and(|jd| jd.should_invoke(regex_signal.as_ref()))
&& self
.feedback
.judge
.as_mut()
.is_some_and(feedback_detector::JudgeDetector::check_rate_limit)
};
let (signal, signal_source) = if judge_should_run {
self.spawn_judge_correction_check(trimmed, conv_id);
(None, "judge")
} else {
(regex_signal, "regex")
};
let Some(signal) = signal else { return };
tracing::info!(
kind = signal.kind.as_str(),
confidence = signal.confidence,
source = signal_source,
"implicit correction detected"
);
let feedback_text = context::truncate_chars(&signal.feedback_text, 500);
if self.is_learning_enabled()
&& signal.kind != feedback_detector::CorrectionKind::SelfCorrection
{
self.record_skill_outcomes(
"user_rejection",
Some(&feedback_text),
Some(signal.kind.as_str()),
)
.await;
}
if let Some(memory) = &self.memory_state.memory {
let correction_text = context::truncate_chars(trimmed, 500);
match memory
.sqlite()
.store_user_correction(
conv_id.map(|c| c.0),
"",
&correction_text,
self.skill_state
.active_skill_names
.first()
.map(String::as_str),
signal.kind.as_str(),
)
.await
{
Ok(correction_id) => {
if let Err(e) = memory
.store_correction_embedding(correction_id, &correction_text)
.await
{
tracing::warn!("failed to store correction embedding: {e:#}");
}
}
Err(e) => tracing::warn!("failed to store user correction: {e:#}"),
}
}
}
pub(super) async fn handle_feedback(&mut self, input: &str) -> Result<(), error::AgentError> {
let Some((name, rest)) = input.split_once(' ') else {
self.channel
.send("Usage: /feedback <skill_name> <message>")
.await?;
return Ok(());
};
let (skill_name, feedback) = (name.trim(), rest.trim().trim_matches('"'));
if feedback.is_empty() {
self.channel
.send("Usage: /feedback <skill_name> <message>")
.await?;
return Ok(());
}
let Some(memory) = &self.memory_state.memory else {
self.channel.send("Memory not available.").await?;
return Ok(());
};
let outcome_type = if self.feedback.detector.detect(feedback, &[]).is_some() {
"user_rejection"
} else {
"user_approval"
};
memory
.sqlite()
.record_skill_outcome(
skill_name,
None,
self.memory_state.conversation_id,
outcome_type,
None,
Some(feedback),
)
.await?;
if self.is_learning_enabled() && outcome_type == "user_rejection" {
self.generate_improved_skill(skill_name, feedback, "", Some(feedback))
.await
.ok();
}
self.channel
.send(&format!("Feedback recorded for \"{skill_name}\"."))
.await?;
Ok(())
}
}