1use crate::streaming::{create_text_stream, StreamHandler, TextStream};
4use crate::templates::compose_prompt_from_state;
5use crate::training::TrainingCollector;
6use crate::types::*;
7use crate::{ZoeyError, Result};
8
9use std::sync::OnceLock;
10use std::sync::{Arc, RwLock};
11use std::time::Instant;
12use tracing::{debug, info, warn};
13
14pub struct MessageProcessor {
16 runtime: Arc<RwLock<crate::AgentRuntime>>,
17
18 training_collector: Option<Arc<TrainingCollector>>,
20}
21
22impl MessageProcessor {
23 pub fn new(runtime: Arc<RwLock<crate::AgentRuntime>>) -> Self {
25 Self {
26 runtime,
27 training_collector: None,
28 }
29 }
30
31 pub fn with_training(
33 runtime: Arc<RwLock<crate::AgentRuntime>>,
34 training_collector: Arc<TrainingCollector>,
35 ) -> Self {
36 Self {
37 runtime,
38 training_collector: Some(training_collector),
39 }
40 }
41
42 pub async fn process_message(&self, message: Memory, room: Room) -> Result<Vec<Memory>> {
44 let span =
45 tracing::info_span!("message_processing", message_id = %message.id, duration_ms = 0i64);
46 let _enter = span.enter();
47 let _start = Instant::now();
48 info!(
49 "INTERACTION_REQUEST id={} room_id={} entity_id={} text_len={} text_preview={}",
50 message.id,
51 message.room_id,
52 message.entity_id,
53 message.content.text.len(),
54 message.content.text.chars().take(120).collect::<String>()
55 );
56
57 info!("INTERACTION_STORE message_id={} table=messages", message.id);
59 let adapter_opt = self.runtime.read().unwrap().adapter.read().unwrap().clone();
62 if let Some(adapter) = adapter_opt.as_ref() {
63 match adapter.create_memory(&message, "messages").await {
64 Ok(id) => info!("Message stored with ID: {}", id),
65 Err(e) => warn!("Failed to store message: {}", e),
66 }
67 } else {
68 warn!("No database adapter configured - message not stored");
69 }
70
71 debug!("Determining if should respond");
73 let should_respond = self.should_respond(&message, &room).await?;
74 let mut message = message; {
77 let mut rt = self.runtime.write().unwrap();
78 let enabled = rt
79 .get_setting("AUTONOMOUS_DELAYED_REASSESSMENT")
80 .and_then(|v| v.as_bool())
81 .unwrap_or(false);
82 if enabled {
83 if let Some((ts, prev)) =
84 crate::utils::delayed_reassessment::DelayedReassessment::pending(
85 &rt,
86 message.room_id,
87 )
88 {
89 if crate::utils::delayed_reassessment::DelayedReassessment::should_wait(ts) {
90 let merged = crate::utils::delayed_reassessment::DelayedReassessment::merge(
91 &prev,
92 &message.content.text,
93 );
94 crate::utils::delayed_reassessment::DelayedReassessment::clear(
95 &mut rt,
96 message.room_id,
97 );
98 message.content.text = merged;
99 } else {
100 crate::utils::delayed_reassessment::DelayedReassessment::clear(
101 &mut rt,
102 message.room_id,
103 );
104 }
105 } else {
106 let incomplete = rt
107 .get_setting("ui:incomplete")
108 .and_then(|v| v.as_bool())
109 .unwrap_or(false);
110 if incomplete {
111 crate::utils::delayed_reassessment::DelayedReassessment::start(
112 &mut rt,
113 message.room_id,
114 &message.content.text,
115 );
116 info!("Deferred response via delayed reassessment window: room_id={} message_id={}", message.room_id, message.id);
117 return Ok(vec![]);
118 }
119 }
120 }
121 }
122
123 if !should_respond {
124 info!("Decided not to respond to message");
125 return Ok(vec![]);
126 }
127
128 {
130 let enabled = {
131 let rt = self.runtime.read().unwrap();
132 rt.get_setting("ui:phase0_enabled")
133 .and_then(|v| v.as_bool())
134 .unwrap_or(true)
135 };
136 if enabled {
137 let pre = crate::preprocessor::Phase0Preprocessor::new(self.runtime.clone());
138 let phase0 = pre.execute(&message).await;
139 if let Ok(res) = phase0 {
140 if let Some(tone) = res.tone.as_ref() {
141 let mut rt = self.runtime.write().unwrap();
142 rt.set_setting("ui:tone", serde_json::json!(tone), false);
143 }
144 if let Some(lang) = res.language.as_ref() {
145 let mut rt = self.runtime.write().unwrap();
146 rt.set_setting("ui:language", serde_json::json!(lang), false);
147 }
148 if let Some(intent) = res.intent.as_ref() {
149 let mut rt = self.runtime.write().unwrap();
150 rt.set_setting("ui:intent", serde_json::json!(intent), false);
151 }
152 let mut rt = self.runtime.write().unwrap();
153 if !res.topics.is_empty() {
154 rt.set_setting("ui:topics", serde_json::json!(res.topics), false);
155 }
156 if !res.keywords.is_empty() {
157 rt.set_setting("ui:keywords", serde_json::json!(res.keywords), false);
158 }
159 if !res.entities.is_empty() {
160 rt.set_setting("ui:entities", serde_json::json!(res.entities), false);
161 }
162 if let Some(comp) = res.complexity.as_ref() {
163 rt.set_setting(
164 "ui:complexity",
165 serde_json::to_value(comp).unwrap_or(serde_json::Value::Null),
166 false,
167 );
168 }
169 let room_id = message.room_id;
170 let avg_key = format!("rhythm:{}:avg_len", room_id);
171 let win_key = format!("rhythm:{}:window", room_id);
172 let mut avg = rt
173 .get_setting(&avg_key)
174 .and_then(|v| v.as_f64())
175 .unwrap_or(0.0);
176 let count = rt
177 .get_setting(&win_key)
178 .and_then(|v| v.as_array().map(|a| a.len()))
179 .unwrap_or(0);
180 let len = message.content.text.len() as f64;
181 avg = if count == 0 {
182 len
183 } else {
184 (avg * (count as f64) + len) / ((count as f64) + 1.0)
185 };
186 rt.set_setting(&avg_key, serde_json::json!(avg), false);
187 let mut window = rt
188 .get_setting(&win_key)
189 .and_then(|v| v.as_array().cloned())
190 .unwrap_or_default();
191 window.push(serde_json::json!(chrono::Utc::now().timestamp()));
192 while window.len() > 10 {
193 window.remove(0);
194 }
195 rt.set_setting(&win_key, serde_json::json!(window), false);
196 let velocity = if window.len() >= 2 {
197 let first = window.first().and_then(|v| v.as_i64()).unwrap_or(0);
198 let last = window.last().and_then(|v| v.as_i64()).unwrap_or(0);
199 let dt = (last - first) as f64;
200 if dt <= 0.0 {
201 0.0
202 } else {
203 (window.len() as f64) / (dt / 60.0)
204 }
205 } else {
206 0.0
207 };
208 rt.set_setting(
209 &format!("rhythm:{}:velocity", room_id),
210 serde_json::json!(velocity),
211 false,
212 );
213 let prev_topics = rt
214 .get_setting(&format!("rhythm:{}:recentTopics", room_id))
215 .and_then(|v| v.as_array().cloned())
216 .unwrap_or_default();
217 rt.set_setting(
218 &format!("rhythm:{}:recentTopics", room_id),
219 serde_json::json!(res.topics.clone()),
220 false,
221 );
222 let prev: Vec<String> = prev_topics
223 .iter()
224 .filter_map(|v| v.as_str().map(|s| s.to_string()))
225 .collect();
226 let overlap = if prev.is_empty() || res.topics.is_empty() {
227 0.0
228 } else {
229 let set_prev: std::collections::HashSet<_> = prev.iter().collect();
230 let inter =
231 res.topics.iter().filter(|t| set_prev.contains(t)).count() as f64;
232 inter / (res.topics.len() as f64)
233 };
234 let drift = overlap < 0.2;
235 rt.set_setting("ui:possibleTopicShift", serde_json::json!(drift), false);
236 let suggested = if velocity > 5.0 {
237 "terse"
238 } else if velocity > 2.0 {
239 "brief"
240 } else if avg > 300.0 {
241 "detailed"
242 } else {
243 "moderate"
244 };
245 rt.set_setting(
246 "ui:suggestedResponseLength",
247 serde_json::json!(suggested),
248 false,
249 );
250 }
251 }
252 }
253
254 debug!("Composing state from providers");
256 let mut state = self.compose_state_with_runtime_ref(&message).await?;
257
258 debug!("Generating response with LLM");
262 let response_text = self.generate_response(&message, &state).await?;
263 info!(
264 "INTERACTION_RESPONSE room_id={} text_preview={}",
265 message.room_id,
266 response_text.chars().take(120).collect::<String>()
267 );
268
269 debug!("Processing actions");
271 let _action_results = self.process_actions(&message, &state).await?;
272
273 let agent_id = {
275 let rt = self.runtime.read().unwrap();
276 rt.agent_id
277 };
278
279 let response_memories = vec![Memory {
280 id: uuid::Uuid::new_v4(),
281 entity_id: agent_id,
282 agent_id,
283 room_id: message.room_id,
284 content: Content {
285 text: response_text.clone(),
286 source: message.content.source.clone(),
287 ..Default::default()
288 },
289 embedding: None,
290 metadata: None,
291 created_at: chrono::Utc::now().timestamp(),
292 unique: Some(false),
293 similarity: None,
294 }];
295
296 let mut response_memories = response_memories;
298 let mut recorded_sample_id: Option<uuid::Uuid> = None;
299 {
300 let fast_mode = {
301 let rt = self.runtime.read().unwrap();
302 rt.get_setting("ui:fast_mode")
303 .and_then(|v| v.as_bool())
304 .unwrap_or(false)
305 };
306 if !fast_mode {
307 if let Some(ref collector) = self.training_collector {
308 if let Some(response_mem) = response_memories.first() {
309 let thought = response_mem.content.thought.clone();
310 match collector
311 .record_conversation_turn(&message, response_mem, thought, &state)
312 .await
313 {
314 Ok(id) => {
315 recorded_sample_id = Some(id);
316 let meta = MemoryMetadata {
317 memory_type: Some("message".to_string()),
318 entity_name: None,
319 data: {
320 let mut m = std::collections::HashMap::new();
321 m.insert(
322 "training_sample_id".to_string(),
323 serde_json::json!(id.to_string()),
324 );
325 m
326 },
327 };
328 response_memories[0].metadata = Some(meta);
329 }
330 Err(_e) => {}
331 }
332 }
333 }
334 }
335 }
336
337 {
339 let fast_mode = {
340 let rt = self.runtime.read().unwrap();
341 rt.get_setting("ui:fast_mode")
342 .and_then(|v| v.as_bool())
343 .unwrap_or(false)
344 };
345 if fast_mode {
346 debug!("Fast mode enabled: skipping evaluators");
347 } else {
348 info!(
349 "INTERACTION_EVALUATORS_START room_id={} message_id={} responses={} ",
350 message.room_id,
351 message.id,
352 response_memories.len()
353 );
354 use crate::runtime_ref::RuntimeRef;
355 let runtime_ref = Arc::new(RuntimeRef::new(&self.runtime));
356 self.evaluate(&message, &state, true, &response_memories)
357 .await?;
358 info!(
359 "INTERACTION_EVALUATORS_DONE room_id={} message_id={}",
360 message.room_id, message.id
361 );
362 }
363 }
364
365 for response in &response_memories {
367 if let Some(adapter) = self
368 .runtime
369 .read()
370 .unwrap()
371 .adapter
372 .read()
373 .unwrap()
374 .as_ref()
375 {
376 match adapter.create_memory(response, "messages").await {
377 Ok(id) => info!("INTERACTION_STORE response_id={} table=messages", id),
378 Err(e) => warn!("Failed to store response: {}", e),
379 }
380 }
381 }
382
383 {
385 let fast_mode = {
386 let rt = self.runtime.read().unwrap();
387 rt.get_setting("ui:fast_mode")
388 .and_then(|v| v.as_bool())
389 .unwrap_or(false)
390 };
391 if !fast_mode {
392 if let (Some(ref collector), Some(sample_id), Some(resp)) = (
393 self.training_collector.as_ref(),
394 recorded_sample_id,
395 response_memories.first(),
396 ) {
397 let key = format!("training:review:{}", resp.id);
398 let (score_opt, note_opt) = {
399 let rt = self.runtime.read().unwrap();
400 let score = rt
401 .get_setting(&(key.clone() + ":score"))
402 .and_then(|v| v.as_f64())
403 .map(|v| v as f32);
404 let note = rt
405 .get_setting(&(key.clone() + ":note"))
406 .and_then(|v| v.as_str().map(|s| s.to_string()));
407 (score, note)
408 };
409 if let Some(review_score) = score_opt {
410 let rlhf_enabled = collector.is_rlhf_enabled();
411 if rlhf_enabled {
412 let mapped = (review_score * 2.0) - 1.0;
413 let _ = collector.add_feedback(sample_id, mapped, note_opt).await;
414 } else {
415 let _ = collector
416 .add_review(sample_id, review_score, note_opt)
417 .await;
418 }
419 }
420 }
421 }
422 }
423
424 {
426 let mut rt = self.runtime.write().unwrap();
427 let key = format!("ui:lastAddressed:{}", message.room_id);
428 rt.set_setting(
429 &key,
430 serde_json::json!(chrono::Utc::now().timestamp()),
431 false,
432 );
433 }
434
435 debug!("Emitting MESSAGE_SENT event");
437 let handler_count = {
438 let rt = self.runtime.read().unwrap();
439 let events = rt.events.read().unwrap();
440 events.get("MESSAGE_SENT").map(|h| h.len()).unwrap_or(0)
441 };
442
443 if handler_count > 0 {
444 debug!("Would invoke {} MESSAGE_SENT event handlers", handler_count);
445 }
446
447 info!(
448 "✓ Message processing complete - {} response(s) generated and stored",
449 response_memories.len()
450 );
451 let _elapsed = _start.elapsed().as_millis() as i64;
452 span.record("duration_ms", &_elapsed);
453 Ok(response_memories)
454 }
455
456 async fn should_respond(&self, message: &Memory, room: &Room) -> Result<bool> {
458 match room.channel_type {
460 ChannelType::Dm | ChannelType::VoiceDm | ChannelType::Api => Ok(true),
461 _ => {
462 let addressed = message
463 .content
464 .metadata
465 .get("addressed_to_me")
466 .and_then(|v| v.as_bool())
467 .unwrap_or(false);
468 if addressed {
469 return Ok(true);
470 }
471 let rt = self.runtime.read().unwrap();
472 let agent_name = &rt.character.name;
473 let text_lc = message.content.text.to_lowercase();
474 let mentioned = text_lc.contains(&agent_name.to_lowercase());
475 let intent_directive = text_lc.starts_with("please ")
476 || text_lc.starts_with("can you")
477 || text_lc.contains("help me")
478 || text_lc.contains("what is")
479 || text_lc.contains("how do");
480 let ttl_ok = {
481 let key = format!("ui:lastAddressed:{}", message.room_id);
482 if let Some(ts) = rt.get_setting(&key).and_then(|v| v.as_i64()) {
483 let now = chrono::Utc::now().timestamp();
484 let elapsed = now - ts;
485 elapsed <= 600 } else {
487 false
488 }
489 };
490 Ok(mentioned || intent_directive || ttl_ok)
491 }
492 }
493 }
494
495 async fn generate_response(&self, message: &Memory, state: &State) -> Result<String> {
497 let template_owned = {
499 let rt = self.runtime.read().unwrap();
500 if let Some(ref templates) = rt.character.templates {
501 templates.message_handler_template.clone()
502 } else {
503 None
504 }
505 };
506 let template_str = template_owned
507 .as_deref()
508 .unwrap_or(crate::templates::MESSAGE_HANDLER_TEMPLATE);
509 let mut prompt = compose_prompt_from_state(&state, template_str).unwrap_or_else(|_| {
510 format!(
512 "You are ZoeyBot, a helpful AI assistant.\n\
513 User message: {}\n\
514 Respond helpfully in XML format with <thought> and <text> tags.",
515 message.content.text
516 )
517 });
518
519 let streaming_enabled = {
522 let rt = self.runtime.read().unwrap();
523 rt.get_setting("ui:streaming")
524 .and_then(|v| v.as_bool())
525 .unwrap_or_else(|| {
526 std::env::var("UI_STREAMING")
527 .map(|v| v.eq_ignore_ascii_case("true"))
528 .unwrap_or(false)
529 })
530 };
531
532 {
534 let text_lc = message.content.text.to_lowercase();
535 let factual = text_lc.contains('?')
536 || text_lc.starts_with("what")
537 || text_lc.starts_with("how")
538 || text_lc.starts_with("why")
539 || text_lc.starts_with("when")
540 || text_lc.starts_with("where");
541 let creative = text_lc.contains("brainstorm")
542 || text_lc.contains("ideas")
543 || text_lc.contains("suggestions")
544 || text_lc.contains("think of");
545 let target_temp = if factual {
546 0.4
547 } else if creative {
548 0.8
549 } else {
550 0.7
551 };
552 let mut rt = self.runtime.write().unwrap();
553 rt.set_setting("ui:temperature", serde_json::json!(target_temp), false);
554 }
555
556 let prompt_debug = {
558 let rt = self.runtime.read().unwrap();
559 rt.get_setting("ui:prompt_debug")
560 .and_then(|v| v.as_bool())
561 .unwrap_or_else(|| {
562 std::env::var("UI_PROMPT_DEBUG")
563 .map(|v| v.eq_ignore_ascii_case("true"))
564 .unwrap_or(false)
565 })
566 };
567 if prompt_debug {
568 debug!("╔════════════════════════════════════════════════════════════════");
569 debug!("║ LLM PROMPT CONTEXT ({} chars)", prompt.len());
570 debug!("╠════════════════════════════════════════════════════════════════");
571 for (i, line) in prompt.lines().take(50).enumerate() {
572 debug!("║ {:3} │ {}", i + 1, line);
573 }
574 if prompt.lines().count() > 50 {
575 debug!("║ ... ({} more lines)", prompt.lines().count() - 50);
576 }
577 debug!("╚════════════════════════════════════════════════════════════════");
578 }
579
580 let raw_response = self.call_llm(&prompt).await?;
583
584 info!("LLM response received ({} chars)", raw_response.len());
585
586 let (thought, response_text_raw) = self.parse_llm_response(&raw_response);
588 let mut response_text = {
589 use regex::Regex;
590 let mut t = response_text_raw.clone();
591 let action_re =
592 Regex::new(r"(?i)^\s*(REPLY|SEND_MESSAGE|IGNORE|NONE)\b[:\-]?\s*").unwrap();
593 t = action_re.replace_all(&t, "").to_string();
594 let action_line_re =
595 Regex::new(r"(?mi)^\s*(REPLY|SEND_MESSAGE|IGNORE|NONE)\b[:\-]?.*$\n?").unwrap();
596 t = action_line_re.replace_all(&t, "").to_string();
597 let html_re = Regex::new(r"(?is)</?[^>]+>").unwrap();
598 t = html_re.replace_all(&t, "").to_string();
599 let dbl_nl = Regex::new(r"\n\n+").unwrap();
601 t = dbl_nl.replace_all(&t, "\n").to_string();
602 let meta_re = Regex::new(
603 r"(?mi)^\s*(The user .*|As an AI.*|I will .*|I can .* assist.*)\.?\s*$\n?",
604 )
605 .unwrap();
606 t = meta_re.replace_all(&t, "").to_string();
607 let leading_punct = Regex::new(r"^[\s,.;:!\-]+").unwrap();
608 t = leading_punct.replace_all(&t, "").to_string();
609 t.trim().to_string()
610 };
611
612 let looks_truncated = {
614 let s = response_text.trim_end();
615 if s.len() < 80 {
616 false
617 } else {
618 match s.chars().rev().find(|c| !c.is_whitespace()) {
619 Some(c) => {
620 let enders = ['.', '!', '?', '”', '’', '"', '\'', ')', ']', '}'];
621 !enders.contains(&c)
622 }
623 None => false,
624 }
625 }
626 };
627 if looks_truncated {
628 if streaming_enabled {
629 if let Ok(cont) = self.continue_response(&response_text).await {
630 if !cont.is_empty() {
631 response_text = format!("{} {}", response_text, cont);
632 }
633 }
634 } else {
635 let trimmed = response_text.trim_end();
637 response_text = format!("{}.", trimmed);
638 }
639 }
640
641 if let Some(ref thought_text) = thought {
643 if let Some(ref collector) = self.training_collector {
644 use crate::runtime_ref::RuntimeRef;
646 let runtime_ref = Arc::new(RuntimeRef::new(&self.runtime));
647 let runtime_any = runtime_ref.as_any_arc();
648
649 let quality_score = 0.7; match collector
651 .store_thought(runtime_any, thought_text, message, quality_score)
652 .await
653 {
654 Ok(id) => debug!("Stored thought with ID: {}", id),
655 Err(e) => warn!("Failed to store thought: {}", e),
656 }
657 } else {
658 self.store_thought_direct(&thought_text, message).await?;
660 }
661 }
662
663 Ok(response_text)
664 }
665
666 async fn continue_response(&self, prev_text: &str) -> Result<String> {
667 let prompt = format!(
668 "Continue the assistant's previous response naturally. Do not repeat content.\n\nPrevious:\n{}\n\nRespond in XML format:\n<response>\n<thought></thought>\n<actions>REPLY</actions>\n<text>",
669 prev_text
670 );
671 let raw = self.call_llm(&prompt).await?;
672 let (_, text) = self.parse_llm_response(&raw);
673 let cleaned = {
674 use regex::Regex;
675 let mut t = text;
676 let html_re = Regex::new(r"(?is)</?[^>]+>").unwrap();
677 t = html_re.replace_all(&t, "").to_string();
678 let action_re =
679 Regex::new(r"(?i)^\s*(REPLY|SEND_MESSAGE|IGNORE|NONE)\b[:\-]?\s*").unwrap();
680 t = action_re.replace_all(&t, "").to_string();
681 let leading = Regex::new(r"^[\s,.;:!\-]+").unwrap();
682 t = leading.replace_all(&t, "").to_string();
683 t.trim().to_string()
684 };
685 Ok(cleaned)
686 }
687
688 async fn call_llm(&self, prompt: &str) -> Result<String> {
690 static COST_CALC: OnceLock<crate::planner::cost::CostCalculator> = OnceLock::new();
691 let (preferred_provider, model_handlers) = {
695 let rt = self.runtime.read().unwrap();
696 let models = rt.models.read().unwrap();
697
698 let provider_pref = rt
700 .get_setting("model_provider")
701 .and_then(|v| v.as_str().map(|s| s.to_string()));
702
703 for (model_type, handlers) in models.iter() {
705 debug!(
706 "Available model type '{}': {} handler(s)",
707 model_type,
708 handlers.len()
709 );
710 for (idx, handler) in handlers.iter().enumerate() {
711 debug!(
712 " [{}] {} (priority: {})",
713 idx, handler.name, handler.priority
714 );
715 }
716 }
717
718 (provider_pref, models.get("TEXT_LARGE").cloned())
719 };
720
721 if let Some(handlers) = model_handlers {
722 let provider = if let Some(pref) = preferred_provider.as_ref() {
724 info!("🎯 Looking for preferred provider: {}", pref);
725
726 let pref_lc = pref.to_lowercase();
729 let is_local_alias = matches!(pref_lc.as_str(), "ollama" | "local" | "llama" | "llamacpp" | "localai");
730
731 let matching = handlers.iter().find(|h| {
733 let h_lc = h.name.to_lowercase();
734 h_lc.contains(&pref_lc) || pref_lc.contains(&h_lc)
736 || (is_local_alias && (h_lc.contains("local") || h_lc.contains("llm")))
738 });
739
740 if let Some(matched) = matching {
741 info!("✓ Found matching provider: {}", matched.name);
742 Some(matched.clone())
743 } else {
744 warn!(
745 "⚠️ Preferred provider '{}' not found, using highest priority",
746 pref
747 );
748 handlers.first().cloned()
749 }
750 } else {
751 handlers.first().cloned()
753 };
754
755 let do_race = {
756 let rt = self.runtime.read().unwrap();
757 let race_setting = rt
758 .get_setting("ui:provider_racing")
759 .and_then(|v| v.as_bool())
760 .unwrap_or_else(|| {
761 std::env::var("UI_PROVIDER_RACING")
762 .map(|v| v.eq_ignore_ascii_case("true"))
763 .unwrap_or(false)
764 });
765 let streaming_ctx = rt
766 .get_setting("ui:streaming")
767 .and_then(|v| v.as_bool())
768 .unwrap_or(false);
769 race_setting && streaming_ctx && preferred_provider.is_none() && handlers.len() > 1
770 };
771
772 if do_race {
773 use tokio::task::JoinSet;
774 let mut js: JoinSet<Result<String>> = JoinSet::new();
775 let max_candidates = 3usize.min(handlers.len());
776 for p in handlers.iter().take(max_candidates) {
777 let name_lc = p.name.to_lowercase();
778 let (preferred_model, temp, max_tokens) = {
779 let rt = self.runtime.read().unwrap();
780 let model = if name_lc.contains("openai") {
781 rt.get_setting("OPENAI_MODEL")
782 .and_then(|v| v.as_str().map(|s| s.to_string()))
783 } else if name_lc.contains("anthropic") || name_lc.contains("claude") {
784 rt.get_setting("ANTHROPIC_MODEL")
785 .and_then(|v| v.as_str().map(|s| s.to_string()))
786 } else {
787 rt.get_setting("LOCAL_LLM_MODEL")
788 .and_then(|v| v.as_str().map(|s| s.to_string()))
789 };
790 let temp = rt
791 .get_setting("ui:temperature")
792 .and_then(|v| v.as_f64().map(|f| f as f32))
793 .or_else(|| {
794 rt.get_setting("temperature")
795 .and_then(|v| v.as_f64().map(|f| f as f32))
796 })
797 .unwrap_or(0.7);
798 let base_tokens = if name_lc.contains("openai")
799 || name_lc.contains("anthropic")
800 || name_lc.contains("claude")
801 {
802 rt.get_setting("max_tokens")
803 .and_then(|v| v.as_u64().map(|u| u as usize))
804 .unwrap_or(150)
805 } else {
806 rt.get_setting("LOCAL_LLM_MAX_TOKENS")
807 .and_then(|v| v.as_u64().map(|u| u as usize))
808 .or_else(|| {
809 rt.get_setting("max_tokens")
810 .and_then(|v| v.as_u64().map(|u| u as usize))
811 })
812 .unwrap_or(150)
813 };
814 (model, temp, base_tokens)
815 };
816 let params = GenerateTextParams {
817 prompt: prompt.to_string(),
818 max_tokens: Some(max_tokens),
819 temperature: Some(temp),
820 top_p: None,
821 stop: None,
822 model: preferred_model,
823 frequency_penalty: None,
824 presence_penalty: None,
825 };
826 let mh_params = ModelHandlerParams {
827 runtime: Arc::new(()),
828 params,
829 };
830 js.spawn((p.handler)(mh_params));
831 }
832 while let Some(res) = js.join_next().await {
833 if let Ok(Ok(text)) = res {
834 return Ok(text);
835 }
836 }
837 } else if let Some(provider) = provider {
838 info!(
839 "🤖 Using LLM provider: {} (priority: {})",
840 provider.name, provider.priority
841 );
842
843 let (preferred_model, temp, max_tokens) = {
845 let rt = self.runtime.read().unwrap();
846
847 let model = if provider.name.to_lowercase().contains("openai") {
849 rt.get_setting("OPENAI_MODEL")
850 .and_then(|v| v.as_str().map(|s| s.to_string()))
851 .or_else(|| {
852 rt.get_setting("openai_model")
853 .and_then(|v| v.as_str().map(|s| s.to_string()))
854 })
855 } else if provider.name.to_lowercase().contains("anthropic")
856 || provider.name.to_lowercase().contains("claude")
857 {
858 rt.get_setting("ANTHROPIC_MODEL")
859 .and_then(|v| v.as_str().map(|s| s.to_string()))
860 .or_else(|| {
861 rt.get_setting("anthropic_model")
862 .and_then(|v| v.as_str().map(|s| s.to_string()))
863 })
864 } else {
865 rt.get_setting("LOCAL_LLM_MODEL")
867 .and_then(|v| v.as_str().map(|s| s.to_string()))
868 .or_else(|| {
869 rt.get_setting("local_llm_model")
870 .and_then(|v| v.as_str().map(|s| s.to_string()))
871 })
872 };
873
874 let temp = rt
876 .get_setting("ui:temperature")
877 .and_then(|v| v.as_f64().map(|f| f as f32))
878 .or_else(|| {
879 rt.get_setting("temperature")
880 .and_then(|v| v.as_f64().map(|f| f as f32))
881 })
882 .unwrap_or(0.7);
883 let base_tokens = if provider.name.to_lowercase().contains("openai")
884 || provider.name.to_lowercase().contains("anthropic")
885 || provider.name.to_lowercase().contains("claude")
886 {
887 rt.get_setting("max_tokens")
888 .and_then(|v| v.as_u64().map(|u| u as usize))
889 .unwrap_or(150)
890 } else {
891 rt.get_setting("LOCAL_LLM_MAX_TOKENS")
892 .and_then(|v| v.as_u64().map(|u| u as usize))
893 .or_else(|| {
894 rt.get_setting("max_tokens")
895 .and_then(|v| v.as_u64().map(|u| u as usize))
896 })
897 .unwrap_or(150)
898 };
899 let mut tokens = {
900 let v = rt.get_setting("ui:verbosity");
901 if let Some(val) = v {
902 if let Some(n) = val.as_u64() {
903 n as usize
904 } else if let Some(s) = val.as_str() {
905 match s.to_lowercase().as_str() {
906 "short" => ((base_tokens as f64 * 0.6) as usize).max(32),
907 "normal" => base_tokens,
908 "long" | "verbose" => {
909 ((base_tokens as f64 * 1.5) as usize).min(base_tokens * 2)
910 }
911 _ => base_tokens,
912 }
913 } else {
914 base_tokens
915 }
916 } else {
917 base_tokens
918 }
919 };
920
921 let streaming_ctx = rt
924 .get_setting("ui:streaming")
925 .and_then(|v| v.as_bool())
926 .unwrap_or(false);
927 let avoid_cutoff = rt
928 .get_setting("ui:avoid_cutoff")
929 .and_then(|v| v.as_bool())
930 .unwrap_or(true);
931 if avoid_cutoff {
932 use crate::planner::tokens::TokenCounter;
933 let calc = COST_CALC.get_or_init(crate::planner::cost::CostCalculator::new);
934 let model_key = model.clone().map(|m| m.to_string()).or_else(|| {
936 let name = provider.name.to_lowercase();
937 if name.contains("openai") {
938 Some("gpt-4o".to_string())
939 } else if name.contains("anthropic") || name.contains("claude") {
940 Some("claude-3.5-sonnet".to_string())
941 } else {
942 Some("local".to_string())
943 }
944 });
945 if let Some(model_name) = model_key.as_ref() {
946 if let Some(pricing) = calc.get_pricing(model_name) {
947 if streaming_ctx {
948 tokens = pricing.max_output_tokens.max(tokens).max(256);
950 } else {
951 tokens = tokens.max(pricing.max_output_tokens).max(512);
953 }
954 } else {
955 tokens = if streaming_ctx {
957 tokens.max(2048)
958 } else {
959 tokens.max(1024)
960 };
961 }
962 }
963 }
964 (model, temp, tokens)
965 };
966
967 let params = GenerateTextParams {
968 prompt: prompt.to_string(),
969 max_tokens: Some(max_tokens),
970 temperature: Some(temp),
971 top_p: None,
972 stop: None,
973 model: preferred_model, frequency_penalty: None,
975 presence_penalty: None,
976 };
977
978 let model_params = ModelHandlerParams {
979 runtime: Arc::new(()),
980 params,
981 };
982
983 debug!(
984 "Calling model handler: {} (temp: {}, max_tokens: {})",
985 provider.name, temp, max_tokens
986 );
987
988 let streaming_enabled = {
990 let rt = self.runtime.read().unwrap();
991 rt.get_setting("ui:streaming")
992 .and_then(|v| v.as_bool())
993 .unwrap_or(false)
994 };
995
996 if streaming_enabled && !provider.name.to_lowercase().contains("openai") {
997 match self.call_ollama_direct(prompt).await {
999 Ok(text) => return Ok(text),
1000 Err(e) => {
1001 warn!(
1002 "Streaming call failed: {}. Falling back to non-streaming.",
1003 e
1004 );
1005 }
1006 }
1007 }
1008
1009 match (provider.handler)(model_params).await {
1010 Ok(text) => {
1011 info!(
1012 "✓ LLM response received via {} ({} chars)",
1013 provider.name,
1014 text.len()
1015 );
1016 debug!("╔════════════════════════════════════════════════════════════════");
1017 debug!(
1018 "║ LLM RESPONSE from {} ({} chars)",
1019 provider.name,
1020 text.len()
1021 );
1022 debug!("╠════════════════════════════════════════════════════════════════");
1023 for (i, line) in text.lines().take(20).enumerate() {
1024 debug!("║ {:3} │ {}", i + 1, line);
1025 }
1026 if text.lines().count() > 20 {
1027 debug!("║ ... ({} more lines)", text.lines().count() - 20);
1028 }
1029 debug!("╚════════════════════════════════════════════════════════════════");
1030 return Ok(text);
1031 }
1032 Err(e) => {
1033 warn!("⚠️ Model handler {} failed: {}", provider.name, e);
1034 warn!("Trying fallback method...");
1035 }
1036 }
1037 } else {
1038 warn!("No model handlers registered for TEXT_LARGE");
1039 }
1040 } else {
1041 warn!("No model handlers found in registry");
1042 }
1043
1044 match preferred_provider.as_ref().map(|s| s.to_lowercase()) {
1046 Some(pref)
1047 if pref.contains("local") || pref.contains("ollama") || pref.contains("llama") =>
1048 {
1049 warn!(
1050 "Falling back to direct Ollama call (preferred provider: {})",
1051 pref
1052 );
1053 self.call_ollama_direct(prompt).await
1054 }
1055 Some(pref) => {
1056 let safe_reply = "<response><thought>Fallback local reasoning</thought><actions>REPLY</actions><text>Okay.</text></response>";
1058 Ok(safe_reply.to_string())
1059 }
1060 None => {
1061 warn!("Falling back to direct Ollama call (no preferred provider set)");
1063 self.call_ollama_direct(prompt).await
1064 }
1065 }
1066 }
1067
1068 pub async fn generate_response_stream(
1070 &self,
1071 message: &Memory,
1072 state: &State,
1073 ) -> Result<TextStream> {
1074 let (sender, receiver) = create_text_stream(64);
1075 let handler = StreamHandler::new(sender);
1076
1077 let final_text = self.generate_response(message, state).await?;
1078
1079 tokio::spawn(async move {
1080 let mut idx = 0usize;
1081 let chunk_size = 200usize;
1082 while idx < final_text.len() {
1083 let end = (idx + chunk_size).min(final_text.len());
1084 let piece = final_text[idx..end].to_string();
1085 let is_final = end >= final_text.len();
1086 if handler.send_chunk(piece, is_final).await.is_err() {
1087 break;
1088 }
1089 idx = end;
1090 if !is_final {
1091 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1092 }
1093 }
1094 });
1095
1096 Ok(receiver)
1097 }
1098
1099 async fn call_ollama_direct(&self, prompt: &str) -> Result<String> {
1101 let (model_name, model_endpoint) = {
1102 let rt = self.runtime.read().unwrap();
1103 let model = rt
1104 .get_setting("LOCAL_LLM_MODEL")
1105 .and_then(|v| v.as_str().map(|s| s.to_string()))
1106 .unwrap_or_else(|| "phi3:mini".to_string());
1107
1108 let endpoint = rt
1109 .get_setting("LOCAL_LLM_ENDPOINT")
1110 .and_then(|v| v.as_str().map(|s| s.to_string()))
1111 .unwrap_or_else(|| "http://localhost:11434".to_string());
1112
1113 (model, endpoint)
1114 };
1115
1116 let client = reqwest::Client::new();
1117 let num_predict = {
1118 let rt = self.runtime.read().unwrap();
1119 rt.get_setting("LOCAL_LLM_MAX_TOKENS")
1120 .and_then(|v| v.as_u64().map(|u| u as usize))
1121 .or_else(|| {
1122 rt.get_setting("local_llm_num_predict")
1123 .and_then(|v| v.as_u64().map(|u| u as usize))
1124 })
1125 .or_else(|| {
1126 rt.get_setting("max_tokens")
1127 .and_then(|v| v.as_u64().map(|u| u as usize))
1128 })
1129 .unwrap_or(400)
1130 };
1131 let ollama_request = serde_json::json!({
1132 "model": model_name,
1133 "prompt": prompt,
1134 "stream": false,
1135 "options": {
1136 "temperature": 0.7,
1137 "num_predict": num_predict
1138 }
1139 });
1140
1141 debug!("Direct Ollama call: {} at {}", model_name, model_endpoint);
1142
1143 match client
1145 .post(format!("{}/api/generate", model_endpoint))
1146 .json(&ollama_request)
1147 .timeout(std::time::Duration::from_secs(5))
1148 .send()
1149 .await
1150 {
1151 Ok(response) => {
1152 if let Ok(json) = response.json::<serde_json::Value>().await {
1153 if let Some(text) = json["response"].as_str() {
1154 return Ok(text.to_string());
1155 }
1156 }
1157 }
1158 Err(e) => {
1159 return Err(ZoeyError::model(format!("Ollama call failed: {}", e)));
1160 }
1161 }
1162
1163 Err(ZoeyError::model("No LLM response received"))
1164 }
1165
1166 async fn store_thought_direct(
1170 &self,
1171 thought_text: &str,
1172 original_message: &Memory,
1173 ) -> Result<()> {
1174 info!(
1175 "💭 Agent thought (direct): {}",
1176 thought_text.chars().take(100).collect::<String>()
1177 );
1178
1179 let agent_id = self.runtime.read().unwrap().agent_id;
1180
1181 let thought_memory = Memory {
1183 id: uuid::Uuid::new_v4(),
1184 entity_id: agent_id,
1185 agent_id,
1186 room_id: original_message.room_id,
1187 content: Content {
1188 text: thought_text.to_string(),
1189 source: Some("internal_thought".to_string()),
1190 thought: Some(thought_text.to_string()),
1191 ..Default::default()
1192 },
1193 embedding: None,
1194 metadata: Some(MemoryMetadata {
1195 memory_type: Some("thought".to_string()),
1196 entity_name: Some("ZoeyBot".to_string()),
1197 data: {
1198 let mut meta = std::collections::HashMap::new();
1199 meta.insert("purpose".to_string(), serde_json::json!("reflection"));
1200 meta.insert(
1201 "related_message".to_string(),
1202 serde_json::json!(original_message.id.to_string()),
1203 );
1204 meta.insert(
1205 "timestamp".to_string(),
1206 serde_json::json!(chrono::Utc::now().timestamp_millis()),
1207 );
1208 meta.insert(
1209 "can_be_used_for".to_string(),
1210 serde_json::json!([
1211 "decision_pattern_analysis",
1212 "response_improvement",
1213 "self_reflection",
1214 "training_data"
1215 ]),
1216 );
1217 meta
1218 },
1219 }),
1220 created_at: chrono::Utc::now().timestamp_millis(),
1221 unique: Some(false),
1222 similarity: None,
1223 };
1224
1225 let adapter_opt = self.runtime.read().unwrap().adapter.read().unwrap().clone();
1227 if let Some(adapter) = adapter_opt.as_ref() {
1228 match adapter.create_memory(&thought_memory, "thoughts").await {
1229 Ok(id) => {
1230 debug!("✓ Thought stored with ID: {}", id);
1231 info!("💾 Stored for: pattern analysis, improvement, reflection, training");
1232 }
1233 Err(e) => warn!("Failed to store thought: {}", e),
1234 }
1235 }
1236
1237 Ok(())
1238 }
1239
1240 fn parse_llm_response(&self, raw_response: &str) -> (Option<String>, String) {
1242 let re = regex::Regex::new(r"(?is)<response[^>]*>.*?(?:<thought>\s*(.*?)\s*</thought>)?.*?(?:<actions>\s*(.*?)\s*</actions>)?.*?<text>\s*(.*?)\s*</text>.*?</response>").unwrap();
1245 if let Some(caps) = re.captures(raw_response) {
1246 let mut thought = caps.get(1).map(|m| m.as_str().trim().to_string());
1247 let actions_match = caps.get(2).map(|m| m.as_str());
1248 let text = caps.get(3).map(|m| m.as_str()).unwrap_or("");
1249 if let Some(actions_match) = actions_match {
1250 let parsed: Vec<String> = actions_match
1251 .split(',')
1252 .map(|s| s.trim().to_string())
1253 .filter(|s| !s.is_empty())
1254 .collect();
1255 if !parsed.is_empty() {
1256 let mut rt = self.runtime.write().unwrap();
1257 rt.set_setting("ui:lastParsedActions", serde_json::json!(parsed), false);
1258 }
1259 }
1260 if thought.is_none() {
1261 if let Some(thought_start) = raw_response.find("<thought>") {
1262 if let Some(thought_end) = raw_response.find("</thought>") {
1263 thought = Some(
1264 raw_response[thought_start + 9..thought_end]
1265 .trim()
1266 .to_string(),
1267 );
1268 }
1269 }
1270 }
1271 return (thought, text.trim().to_string());
1272 }
1273
1274 if let Some(text_start) = raw_response.find("<text>") {
1275 if let Some(text_end) = raw_response.find("</text>") {
1276 let text = &raw_response[text_start + 6..text_end];
1277 let thought = if let Some(thought_start) = raw_response.find("<thought>") {
1278 if let Some(thought_end) = raw_response.find("</thought>") {
1279 Some(
1280 raw_response[thought_start + 9..thought_end]
1281 .trim()
1282 .to_string(),
1283 )
1284 } else {
1285 None
1286 }
1287 } else {
1288 None
1289 };
1290 if let Some(actions_start) = raw_response.find("<actions>") {
1292 if let Some(actions_end) = raw_response.find("</actions>") {
1293 let actions_str = raw_response[actions_start + 9..actions_end].trim();
1294 let parsed: Vec<String> = actions_str
1295 .split(',')
1296 .map(|s| s.trim().to_string())
1297 .filter(|s| !s.is_empty())
1298 .collect();
1299 if !parsed.is_empty() {
1300 let mut rt = self.runtime.write().unwrap();
1302 rt.set_setting(
1303 "ui:lastParsedActions",
1304 serde_json::json!(parsed),
1305 false,
1306 );
1307 }
1308 }
1309 }
1310 return (thought, text.trim().to_string());
1311 }
1312 }
1313 let cleaned = regex::Regex::new("(?s)</?[^>]+>")
1314 .unwrap()
1315 .replace_all(raw_response, "")
1316 .to_string()
1317 .trim()
1318 .to_string();
1319 let final_text = if cleaned.is_empty() {
1320 raw_response.trim().to_string()
1321 } else {
1322 cleaned
1323 };
1324 (None, final_text)
1325 }
1326
1327 async fn process_actions(&self, message: &Memory, state: &State) -> Result<Vec<ActionResult>> {
1329 let rt = self.runtime.read().unwrap();
1330 let actions = rt.actions.read().unwrap();
1331
1332 let runtime_ref: Arc<dyn std::any::Any + Send + Sync> = Arc::new(());
1334
1335 let mut results: Vec<ActionResult> = Vec::new();
1337 if let Some(reply) = actions.iter().find(|a| a.name() == "REPLY") {
1338 match reply.validate(runtime_ref.clone(), message, state).await {
1339 Ok(true) => {
1340 debug!("Executing action: REPLY");
1341 if let Ok(Some(result)) = reply
1342 .handler(runtime_ref.clone(), message, state, None, None)
1343 .await
1344 {
1345 results.push(result);
1346 }
1347 }
1348 Ok(false) => debug!("REPLY action validation failed"),
1349 Err(e) => warn!("Action REPLY validate error: {}", e),
1350 }
1351 }
1352
1353 let planned = {
1355 let rt = self.runtime.read().unwrap();
1356 rt.get_setting("ui:lastParsedActions")
1357 .and_then(|v| v.as_array().cloned())
1358 .unwrap_or_default()
1359 };
1360 for name_val in planned {
1361 if let Some(name) = name_val.as_str() {
1362 if let Some(act) = actions.iter().find(|a| a.name().eq_ignore_ascii_case(name)) {
1363 match act.validate(runtime_ref.clone(), message, state).await {
1364 Ok(true) => {
1365 debug!("Executing additional action: {}", act.name());
1366 match act
1367 .handler(runtime_ref.clone(), message, state, None, None)
1368 .await
1369 {
1370 Ok(Some(res)) => results.push(res),
1371 Ok(None) => {}
1372 Err(e) => warn!("Action {} failed: {}", act.name(), e),
1373 }
1374 }
1375 Ok(false) => debug!("Additional action {} validation failed", act.name()),
1376 Err(e) => warn!("Action {} validate error: {}", act.name(), e),
1377 }
1378 }
1379 }
1380 }
1381
1382 Ok(results)
1383 }
1384
1385 async fn evaluate(
1387 &self,
1388 message: &Memory,
1389 state: &State,
1390 did_respond: bool,
1391 responses: &[Memory],
1392 ) -> Result<()> {
1393 let rt = self.runtime.read().unwrap();
1394 let evaluators = rt.evaluators.read().unwrap();
1395
1396 let runtime_ref: Arc<dyn std::any::Any + Send + Sync> = Arc::new(());
1398
1399 for evaluator in evaluators.iter() {
1400 let should_run = evaluator.always_run()
1402 || evaluator
1403 .validate(runtime_ref.clone(), message, state)
1404 .await
1405 .unwrap_or(false);
1406
1407 if should_run {
1408 debug!("Running evaluator: {}", evaluator.name());
1409 if let Err(e) = evaluator
1410 .handler(
1411 runtime_ref.clone(),
1412 message,
1413 state,
1414 did_respond,
1415 Some(responses.to_vec()),
1416 )
1417 .await
1418 {
1419 warn!("Evaluator {} failed: {}", evaluator.name(), e);
1420 }
1421 }
1422 }
1423
1424 Ok(())
1425 }
1426
1427 async fn compose_state_with_runtime_ref(&self, message: &Memory) -> Result<State> {
1429 use crate::runtime_ref::RuntimeRef;
1430
1431 let runtime_ref = Arc::new(RuntimeRef::new(&self.runtime));
1433 let runtime_any = runtime_ref.as_any_arc();
1434
1435 let mut state = State::new();
1436
1437 let (providers, fast_mode) = {
1439 let rt = self.runtime.read().unwrap();
1440 let fast = rt
1441 .get_setting("ui:fast_mode")
1442 .and_then(|v| v.as_bool())
1443 .unwrap_or(false);
1444 let list = rt.providers.read().unwrap().clone();
1445 (list, fast)
1446 };
1447
1448 for provider in &providers {
1450 if fast_mode {
1451 let name = provider.name().to_lowercase();
1453 if name.contains("planner")
1454 || name.contains("recall")
1455 || name.contains("session_cues")
1456 {
1457 continue;
1458 }
1459 }
1460 debug!("Running provider: {}", provider.name());
1461
1462 match provider.get(runtime_any.clone(), message, &state).await {
1463 Ok(result) => {
1464 let mut has_output = false;
1465
1466 if let Some(ref text) = result.text {
1467 state.set_value(provider.name().to_uppercase(), text.clone());
1468 has_output = true;
1469 }
1470 if let Some(values) = result.values {
1471 for (k, v) in values {
1472 state.set_value(k, v);
1473 }
1474 has_output = true;
1475 }
1476 if let Some(ref data) = result.data {
1477 for (k, v) in data.clone() {
1478 state.set_data(k, v);
1479 }
1480 has_output = true;
1481 }
1482
1483 if provider.name() == "reaction_planner" || provider.name() == "output_planner"
1485 {
1486 if has_output {
1487 debug!(
1488 "╔════════════════════════════════════════════════════════════════"
1489 );
1490 debug!("║ {} OUTPUT", provider.name().to_uppercase());
1491 debug!(
1492 "╠════════════════════════════════════════════════════════════════"
1493 );
1494
1495 if let Some(ref text) = result.text {
1496 for line in text.lines() {
1497 debug!("║ {}", line);
1498 }
1499 }
1500
1501 if let Some(ref data) = result.data {
1502 if let Some(plan_data) = data.values().next() {
1503 debug!(
1504 "║ Data: {}",
1505 serde_json::to_string_pretty(plan_data).unwrap_or_default()
1506 );
1507 }
1508 }
1509
1510 debug!(
1511 "╚════════════════════════════════════════════════════════════════"
1512 );
1513 }
1514 }
1515 }
1516 Err(e) => {
1517 warn!("Provider {} failed: {}", provider.name(), e);
1518 }
1519 }
1520 }
1521 {
1522 let rt = self.runtime.read().unwrap();
1523 if let Some(tone) = rt
1525 .get_setting("ui:tone")
1526 .and_then(|v| v.as_str().map(|s| s.to_string()))
1527 {
1528 state.set_value("UI_TONE", tone);
1529 }
1530 if let Some(verb) = rt.get_setting("ui:verbosity") {
1531 let verb_s = if let Some(s) = verb.as_str() {
1532 s.to_string()
1533 } else {
1534 verb.to_string()
1535 };
1536 state.set_value("UI_VERBOSITY", verb_s);
1537 }
1538 if let Some(sug) = rt
1539 .get_setting("ui:suggestedResponseLength")
1540 .and_then(|v| v.as_str().map(|s| s.to_string()))
1541 {
1542 state.set_value("UI_SUGGESTED_RESPONSE_LENGTH", sug);
1543 }
1544 if let Some(shift) = rt
1545 .get_setting("ui:possibleTopicShift")
1546 .and_then(|v| v.as_bool())
1547 {
1548 state.set_value(
1549 "UI_TOPIC_SHIFT",
1550 if shift {
1551 "true".to_string()
1552 } else {
1553 "false".to_string()
1554 },
1555 );
1556 }
1557 let room_prefix = format!("ui:lastThought:{}:", message.room_id);
1558 let last_thoughts = rt
1559 .get_settings_with_prefix(&room_prefix)
1560 .into_iter()
1561 .map(|(_, v)| v)
1562 .collect::<Vec<String>>();
1563 if !last_thoughts.is_empty() {
1564 let summary = last_thoughts.join(" ");
1565 state.set_value("CONTEXT_LAST_THOUGHT", summary);
1566 }
1567 state.set_value("LAST_PROMPT", message.content.text.clone());
1568 if let Some(lang) = rt
1569 .get_setting("ui:language")
1570 .and_then(|v| v.as_str().map(|s| s.to_string()))
1571 {
1572 state.set_value("UI_LANGUAGE", lang);
1573 }
1574 if let Some(intent) = rt
1575 .get_setting("ui:intent")
1576 .and_then(|v| v.as_str().map(|s| s.to_string()))
1577 {
1578 state.set_value("UI_INTENT", intent);
1579 }
1580 if let Some(kw) = rt
1581 .get_setting("ui:keywords")
1582 .and_then(|v| v.as_array().cloned())
1583 {
1584 let joined = kw
1585 .into_iter()
1586 .filter_map(|x| x.as_str().map(|s| s.to_string()))
1587 .collect::<Vec<String>>()
1588 .join(", ");
1589 state.set_value("UI_KEYWORDS", joined);
1590 }
1591 if let Some(top) = rt
1592 .get_setting("ui:topics")
1593 .and_then(|v| v.as_array().cloned())
1594 {
1595 let joined = top
1596 .into_iter()
1597 .filter_map(|x| x.as_str().map(|s| s.to_string()))
1598 .collect::<Vec<String>>()
1599 .join(", ");
1600 state.set_value("UI_TOPICS", joined);
1601 }
1602 if let Some(ent) = rt
1603 .get_setting("ui:entities")
1604 .and_then(|v| v.as_array().cloned())
1605 {
1606 let joined = ent
1607 .into_iter()
1608 .filter_map(|x| x.as_str().map(|s| s.to_string()))
1609 .collect::<Vec<String>>()
1610 .join(", ");
1611 state.set_value("UI_ENTITIES", joined);
1612 }
1613 if let Some(arr) = rt
1614 .get_setting("phase0:agent_candidates")
1615 .and_then(|v| v.as_array().cloned())
1616 {
1617 let joined = arr
1618 .into_iter()
1619 .filter_map(|x| x.as_str().map(|s| s.to_string()))
1620 .collect::<Vec<String>>()
1621 .join(", ");
1622 state.set_value("UI_AGENT_CANDIDATES", joined);
1623 }
1624 if let Some(comp) = rt
1625 .get_setting("ui:complexity")
1626 .and_then(|v| v.as_object().cloned())
1627 {
1628 if let Some(level) = comp.get("level").and_then(|v| v.as_str()) {
1629 state.set_value("UI_COMPLEXITY_LEVEL", level.to_string());
1630 }
1631 if let Some(reasoning) = comp.get("reasoning").and_then(|v| v.as_str()) {
1632 state.set_value("UI_COMPLEXITY_REASONING", reasoning.to_string());
1633 }
1634 }
1635 }
1636
1637 {
1639 use crate::planner::cost::CostCalculator;
1640 use crate::planner::tokens::TokenCounter;
1641 let calc = CostCalculator::new();
1642 let template = crate::templates::MESSAGE_HANDLER_TEMPLATE;
1643 if let Ok(preview) = crate::templates::compose_prompt_from_state(&state, template) {
1644 let estimated_input = TokenCounter::estimate_tokens(&preview);
1645 let rt = self.runtime.read().unwrap();
1646 let provider_name = rt
1647 .models
1648 .read()
1649 .unwrap()
1650 .get("TEXT_LARGE")
1651 .and_then(|v| v.first())
1652 .map(|h| h.name.clone())
1653 .unwrap_or_else(|| "local".to_string());
1654 let model_key = if provider_name.to_lowercase().contains("openai") {
1655 "gpt-4o".to_string()
1656 } else if provider_name.to_lowercase().contains("claude")
1657 || provider_name.to_lowercase().contains("anthropic")
1658 {
1659 "claude-3.5-sonnet".to_string()
1660 } else {
1661 "local".to_string()
1662 };
1663 if let Some(pricing) = calc.get_pricing(&model_key) {
1664 let compact = estimated_input + 256 > pricing.context_window;
1665 state.set_value("UI_COMPACT_CONTEXT", if compact { "true" } else { "false" });
1666 }
1667 }
1668 }
1669
1670 Ok(state)
1671 }
1672}
1673
1674#[cfg(test)]
1675mod tests {
1676 use super::*;
1677 use crate::RuntimeOpts;
1678 use regex::Regex;
1679
1680 #[tokio::test]
1681 #[ignore = "Integration test - requires Ollama running at localhost:11434"]
1682 async fn test_message_processor() {
1683 let runtime = crate::AgentRuntime::new(RuntimeOpts::default())
1684 .await
1685 .unwrap();
1686 let processor = MessageProcessor::new(runtime);
1687
1688 let message = Memory {
1689 id: uuid::Uuid::new_v4(),
1690 entity_id: uuid::Uuid::new_v4(),
1691 agent_id: uuid::Uuid::new_v4(),
1692 room_id: uuid::Uuid::new_v4(),
1693 content: Content {
1694 text: "Hello!".to_string(),
1695 ..Default::default()
1696 },
1697 embedding: None,
1698 metadata: None,
1699 created_at: chrono::Utc::now().timestamp(),
1700 unique: None,
1701 similarity: None,
1702 };
1703
1704 let room = Room {
1705 id: message.room_id,
1706 agent_id: Some(message.agent_id),
1707 name: "Test Room".to_string(),
1708 source: "test".to_string(),
1709 channel_type: ChannelType::Dm,
1710 channel_id: None,
1711 server_id: None,
1712 world_id: uuid::Uuid::new_v4(),
1713 metadata: std::collections::HashMap::new(),
1714 created_at: None,
1715 };
1716
1717 let result = processor.process_message(message, room).await;
1718 if let Err(ref e) = result {
1719 eprintln!("Message processing failed: {:?}", e);
1720 }
1721 assert!(result.is_ok());
1722 }
1723
1724 #[test]
1725 fn test_parse_llm_response_xml_variants() {
1726 let runtime = crate::create_mock_runtime();
1727 let proc = MessageProcessor::new(runtime);
1728 let xml = "<response>\n<thought>think</thought>\n<actions>REPLY,ASK_CLARIFY</actions>\n<text>Hello world</text>\n</response>";
1729 let (thought, text) = proc.parse_llm_response(xml);
1730 assert_eq!(thought.as_deref(), Some("think"));
1731 assert_eq!(text, "Hello world");
1732 }
1733
1734 #[test]
1735 fn test_parse_llm_response_xml_spacing() {
1736 let runtime = crate::create_mock_runtime();
1737 let proc = MessageProcessor::new(runtime);
1738 let xml = "<response> <text> spaced </text> </response>";
1739 let (_thought, text) = proc.parse_llm_response(xml);
1740 assert_eq!(text, "spaced");
1741 }
1742
1743 #[test]
1744 fn test_tone_verbosity_injection() {
1745 let mut state = State::new();
1746 state.set_value("UI_TONE", "friendly");
1747 state.set_value("UI_VERBOSITY", "short");
1748 let tpl = "Tone: {{UI_TONE}} Verbosity: {{UI_VERBOSITY}}";
1749 let rendered = crate::templates::compose_prompt_from_state(&state, tpl).unwrap();
1750 assert!(rendered.contains("friendly"));
1751 assert!(rendered.contains("short"));
1752 }
1753
1754 #[test]
1755 fn test_parse_llm_response_malformed_actions() {
1756 let runtime = crate::create_mock_runtime();
1757 let proc = MessageProcessor::new(runtime);
1758 let xml = "<response><text>hello</text><actions> , , REPLY ,, </actions></response>";
1759 let (_thought, text) = proc.parse_llm_response(xml);
1760 assert_eq!(text, "hello");
1761 }
1762
1763 #[test]
1764 fn test_parse_llm_response_missing_wrapper() {
1765 let runtime = crate::create_mock_runtime();
1766 let proc = MessageProcessor::new(runtime);
1767 let xml = "<text>hello</text>";
1768 let (_thought, text) = proc.parse_llm_response(xml);
1769 assert_eq!(text, "hello");
1770 }
1771
1772 #[test]
1773 fn test_parse_llm_response_multiple_text() {
1774 let runtime = crate::create_mock_runtime();
1775 let proc = MessageProcessor::new(runtime);
1776 let xml = "<response><text>first</text><text>second</text></response>";
1777 let (_thought, text) = proc.parse_llm_response(xml);
1778 assert!(text == "first" || text == "second");
1779 }
1780}
1781
1782