zoey_core/
message.rs

1//! Message processing pipeline
2
3use crate::streaming::{create_text_stream, StreamHandler, TextStream};
4use crate::templates::compose_prompt_from_state;
5use crate::training::TrainingCollector;
6use crate::types::*;
7use crate::{ZoeyError, Result};
8
9use std::sync::OnceLock;
10use std::sync::{Arc, RwLock};
11use std::time::Instant;
12use tracing::{debug, info, warn};
13
14/// Message processor for handling incoming messages
15pub struct MessageProcessor {
16    runtime: Arc<RwLock<crate::AgentRuntime>>,
17
18    /// Training collector for RLHF and model fine-tuning
19    training_collector: Option<Arc<TrainingCollector>>,
20}
21
22impl MessageProcessor {
23    /// Create a new message processor
24    pub fn new(runtime: Arc<RwLock<crate::AgentRuntime>>) -> Self {
25        Self {
26            runtime,
27            training_collector: None,
28        }
29    }
30
31    /// Create a new message processor with training enabled
32    pub fn with_training(
33        runtime: Arc<RwLock<crate::AgentRuntime>>,
34        training_collector: Arc<TrainingCollector>,
35    ) -> Self {
36        Self {
37            runtime,
38            training_collector: Some(training_collector),
39        }
40    }
41
42    /// Process an incoming message
43    pub async fn process_message(&self, message: Memory, room: Room) -> Result<Vec<Memory>> {
44        let span =
45            tracing::info_span!("message_processing", message_id = %message.id, duration_ms = 0i64);
46        let _enter = span.enter();
47        let _start = Instant::now();
48        info!(
49            "INTERACTION_REQUEST id={} room_id={} entity_id={} text_len={} text_preview={}",
50            message.id,
51            message.room_id,
52            message.entity_id,
53            message.content.text.len(),
54            message.content.text.chars().take(120).collect::<String>()
55        );
56
57        // 1. Store incoming message in database
58        info!("INTERACTION_STORE message_id={} table=messages", message.id);
59        // Production: Actually store the message
60        // Note: Database operations are async
61        let adapter_opt = self.runtime.read().unwrap().adapter.read().unwrap().clone();
62        if let Some(adapter) = adapter_opt.as_ref() {
63            match adapter.create_memory(&message, "messages").await {
64                Ok(id) => info!("Message stored with ID: {}", id),
65                Err(e) => warn!("Failed to store message: {}", e),
66            }
67        } else {
68            warn!("No database adapter configured - message not stored");
69        }
70
71        // 2. Determine if should respond (with delayed reassessment window)
72        debug!("Determining if should respond");
73        let should_respond = self.should_respond(&message, &room).await?;
74        // Merge follow-up if pending within window, otherwise start window for incomplete
75        let mut message = message; // shadow for potential text update
76        {
77            let mut rt = self.runtime.write().unwrap();
78            let enabled = rt
79                .get_setting("AUTONOMOUS_DELAYED_REASSESSMENT")
80                .and_then(|v| v.as_bool())
81                .unwrap_or(false);
82            if enabled {
83                if let Some((ts, prev)) =
84                    crate::utils::delayed_reassessment::DelayedReassessment::pending(
85                        &rt,
86                        message.room_id,
87                    )
88                {
89                    if crate::utils::delayed_reassessment::DelayedReassessment::should_wait(ts) {
90                        let merged = crate::utils::delayed_reassessment::DelayedReassessment::merge(
91                            &prev,
92                            &message.content.text,
93                        );
94                        crate::utils::delayed_reassessment::DelayedReassessment::clear(
95                            &mut rt,
96                            message.room_id,
97                        );
98                        message.content.text = merged;
99                    } else {
100                        crate::utils::delayed_reassessment::DelayedReassessment::clear(
101                            &mut rt,
102                            message.room_id,
103                        );
104                    }
105                } else {
106                    let incomplete = rt
107                        .get_setting("ui:incomplete")
108                        .and_then(|v| v.as_bool())
109                        .unwrap_or(false);
110                    if incomplete {
111                        crate::utils::delayed_reassessment::DelayedReassessment::start(
112                            &mut rt,
113                            message.room_id,
114                            &message.content.text,
115                        );
116                        info!("Deferred response via delayed reassessment window: room_id={} message_id={}", message.room_id, message.id);
117                        return Ok(vec![]);
118                    }
119                }
120            }
121        }
122
123        if !should_respond {
124            info!("Decided not to respond to message");
125            return Ok(vec![]);
126        }
127
128        // Phase 0: Preprocess message with mini-pipelines
129        {
130            let enabled = {
131                let rt = self.runtime.read().unwrap();
132                rt.get_setting("ui:phase0_enabled")
133                    .and_then(|v| v.as_bool())
134                    .unwrap_or(true)
135            };
136            if enabled {
137                let pre = crate::preprocessor::Phase0Preprocessor::new(self.runtime.clone());
138                let phase0 = pre.execute(&message).await;
139                if let Ok(res) = phase0 {
140                    if let Some(tone) = res.tone.as_ref() {
141                        let mut rt = self.runtime.write().unwrap();
142                        rt.set_setting("ui:tone", serde_json::json!(tone), false);
143                    }
144                    if let Some(lang) = res.language.as_ref() {
145                        let mut rt = self.runtime.write().unwrap();
146                        rt.set_setting("ui:language", serde_json::json!(lang), false);
147                    }
148                    if let Some(intent) = res.intent.as_ref() {
149                        let mut rt = self.runtime.write().unwrap();
150                        rt.set_setting("ui:intent", serde_json::json!(intent), false);
151                    }
152                    let mut rt = self.runtime.write().unwrap();
153                    if !res.topics.is_empty() {
154                        rt.set_setting("ui:topics", serde_json::json!(res.topics), false);
155                    }
156                    if !res.keywords.is_empty() {
157                        rt.set_setting("ui:keywords", serde_json::json!(res.keywords), false);
158                    }
159                    if !res.entities.is_empty() {
160                        rt.set_setting("ui:entities", serde_json::json!(res.entities), false);
161                    }
162                    if let Some(comp) = res.complexity.as_ref() {
163                        rt.set_setting(
164                            "ui:complexity",
165                            serde_json::to_value(comp).unwrap_or(serde_json::Value::Null),
166                            false,
167                        );
168                    }
169                    let room_id = message.room_id;
170                    let avg_key = format!("rhythm:{}:avg_len", room_id);
171                    let win_key = format!("rhythm:{}:window", room_id);
172                    let mut avg = rt
173                        .get_setting(&avg_key)
174                        .and_then(|v| v.as_f64())
175                        .unwrap_or(0.0);
176                    let count = rt
177                        .get_setting(&win_key)
178                        .and_then(|v| v.as_array().map(|a| a.len()))
179                        .unwrap_or(0);
180                    let len = message.content.text.len() as f64;
181                    avg = if count == 0 {
182                        len
183                    } else {
184                        (avg * (count as f64) + len) / ((count as f64) + 1.0)
185                    };
186                    rt.set_setting(&avg_key, serde_json::json!(avg), false);
187                    let mut window = rt
188                        .get_setting(&win_key)
189                        .and_then(|v| v.as_array().cloned())
190                        .unwrap_or_default();
191                    window.push(serde_json::json!(chrono::Utc::now().timestamp()));
192                    while window.len() > 10 {
193                        window.remove(0);
194                    }
195                    rt.set_setting(&win_key, serde_json::json!(window), false);
196                    let velocity = if window.len() >= 2 {
197                        let first = window.first().and_then(|v| v.as_i64()).unwrap_or(0);
198                        let last = window.last().and_then(|v| v.as_i64()).unwrap_or(0);
199                        let dt = (last - first) as f64;
200                        if dt <= 0.0 {
201                            0.0
202                        } else {
203                            (window.len() as f64) / (dt / 60.0)
204                        }
205                    } else {
206                        0.0
207                    };
208                    rt.set_setting(
209                        &format!("rhythm:{}:velocity", room_id),
210                        serde_json::json!(velocity),
211                        false,
212                    );
213                    let prev_topics = rt
214                        .get_setting(&format!("rhythm:{}:recentTopics", room_id))
215                        .and_then(|v| v.as_array().cloned())
216                        .unwrap_or_default();
217                    rt.set_setting(
218                        &format!("rhythm:{}:recentTopics", room_id),
219                        serde_json::json!(res.topics.clone()),
220                        false,
221                    );
222                    let prev: Vec<String> = prev_topics
223                        .iter()
224                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
225                        .collect();
226                    let overlap = if prev.is_empty() || res.topics.is_empty() {
227                        0.0
228                    } else {
229                        let set_prev: std::collections::HashSet<_> = prev.iter().collect();
230                        let inter =
231                            res.topics.iter().filter(|t| set_prev.contains(t)).count() as f64;
232                        inter / (res.topics.len() as f64)
233                    };
234                    let drift = overlap < 0.2;
235                    rt.set_setting("ui:possibleTopicShift", serde_json::json!(drift), false);
236                    let suggested = if velocity > 5.0 {
237                        "terse"
238                    } else if velocity > 2.0 {
239                        "brief"
240                    } else if avg > 300.0 {
241                        "detailed"
242                    } else {
243                        "moderate"
244                    };
245                    rt.set_setting(
246                        "ui:suggestedResponseLength",
247                        serde_json::json!(suggested),
248                        false,
249                    );
250                }
251            }
252        }
253
254        // 3. Compose state from providers (with RuntimeRef)
255        debug!("Composing state from providers");
256        let mut state = self.compose_state_with_runtime_ref(&message).await?;
257
258        // Providers include curated memories; no direct service downcasting
259
260        // 4. Generate response using LLM
261        debug!("Generating response with LLM");
262        let response_text = self.generate_response(&message, &state).await?;
263        info!(
264            "INTERACTION_RESPONSE room_id={} text_preview={}",
265            message.room_id,
266            response_text.chars().take(120).collect::<String>()
267        );
268
269        // 5. Process actions (determine which actions to take based on response)
270        debug!("Processing actions");
271        let _action_results = self.process_actions(&message, &state).await?;
272
273        // 6. Create response memories
274        let agent_id = {
275            let rt = self.runtime.read().unwrap();
276            rt.agent_id
277        };
278
279        let response_memories = vec![Memory {
280            id: uuid::Uuid::new_v4(),
281            entity_id: agent_id,
282            agent_id,
283            room_id: message.room_id,
284            content: Content {
285                text: response_text.clone(),
286                source: message.content.source.clone(),
287                ..Default::default()
288            },
289            embedding: None,
290            metadata: None,
291            created_at: chrono::Utc::now().timestamp(),
292            unique: Some(false),
293            similarity: None,
294        }];
295
296        // 7. Record training sample early and attach sample_id to response metadata
297        let mut response_memories = response_memories;
298        let mut recorded_sample_id: Option<uuid::Uuid> = None;
299        {
300            let fast_mode = {
301                let rt = self.runtime.read().unwrap();
302                rt.get_setting("ui:fast_mode")
303                    .and_then(|v| v.as_bool())
304                    .unwrap_or(false)
305            };
306            if !fast_mode {
307                if let Some(ref collector) = self.training_collector {
308                    if let Some(response_mem) = response_memories.first() {
309                        let thought = response_mem.content.thought.clone();
310                        match collector
311                            .record_conversation_turn(&message, response_mem, thought, &state)
312                            .await
313                        {
314                            Ok(id) => {
315                                recorded_sample_id = Some(id);
316                                let meta = MemoryMetadata {
317                                    memory_type: Some("message".to_string()),
318                                    entity_name: None,
319                                    data: {
320                                        let mut m = std::collections::HashMap::new();
321                                        m.insert(
322                                            "training_sample_id".to_string(),
323                                            serde_json::json!(id.to_string()),
324                                        );
325                                        m
326                                    },
327                                };
328                                response_memories[0].metadata = Some(meta);
329                            }
330                            Err(_e) => {}
331                        }
332                    }
333                }
334            }
335        }
336
337        // 8. Run evaluators (skip in fast mode)
338        {
339            let fast_mode = {
340                let rt = self.runtime.read().unwrap();
341                rt.get_setting("ui:fast_mode")
342                    .and_then(|v| v.as_bool())
343                    .unwrap_or(false)
344            };
345            if fast_mode {
346                debug!("Fast mode enabled: skipping evaluators");
347            } else {
348                info!(
349                    "INTERACTION_EVALUATORS_START room_id={} message_id={} responses={} ",
350                    message.room_id,
351                    message.id,
352                    response_memories.len()
353                );
354                use crate::runtime_ref::RuntimeRef;
355                let runtime_ref = Arc::new(RuntimeRef::new(&self.runtime));
356                self.evaluate(&message, &state, true, &response_memories)
357                    .await?;
358                info!(
359                    "INTERACTION_EVALUATORS_DONE room_id={} message_id={}",
360                    message.room_id, message.id
361                );
362            }
363        }
364
365        // 9. Store response messages in database (production)
366        for response in &response_memories {
367            if let Some(adapter) = self
368                .runtime
369                .read()
370                .unwrap()
371                .adapter
372                .read()
373                .unwrap()
374                .as_ref()
375            {
376                match adapter.create_memory(response, "messages").await {
377                    Ok(id) => info!("INTERACTION_STORE response_id={} table=messages", id),
378                    Err(e) => warn!("Failed to store response: {}", e),
379                }
380            }
381        }
382
383        // 10. Apply evaluator review signals to training collector
384        {
385            let fast_mode = {
386                let rt = self.runtime.read().unwrap();
387                rt.get_setting("ui:fast_mode")
388                    .and_then(|v| v.as_bool())
389                    .unwrap_or(false)
390            };
391            if !fast_mode {
392                if let (Some(ref collector), Some(sample_id), Some(resp)) = (
393                    self.training_collector.as_ref(),
394                    recorded_sample_id,
395                    response_memories.first(),
396                ) {
397                    let key = format!("training:review:{}", resp.id);
398                    let (score_opt, note_opt) = {
399                        let rt = self.runtime.read().unwrap();
400                        let score = rt
401                            .get_setting(&(key.clone() + ":score"))
402                            .and_then(|v| v.as_f64())
403                            .map(|v| v as f32);
404                        let note = rt
405                            .get_setting(&(key.clone() + ":note"))
406                            .and_then(|v| v.as_str().map(|s| s.to_string()));
407                        (score, note)
408                    };
409                    if let Some(review_score) = score_opt {
410                        let rlhf_enabled = collector.is_rlhf_enabled();
411                        if rlhf_enabled {
412                            let mapped = (review_score * 2.0) - 1.0;
413                            let _ = collector.add_feedback(sample_id, mapped, note_opt).await;
414                        } else {
415                            let _ = collector
416                                .add_review(sample_id, review_score, note_opt)
417                                .await;
418                        }
419                    }
420                }
421            }
422        }
423
424        // 11. Update active-thread TTL for this room
425        {
426            let mut rt = self.runtime.write().unwrap();
427            let key = format!("ui:lastAddressed:{}", message.room_id);
428            rt.set_setting(
429                &key,
430                serde_json::json!(chrono::Utc::now().timestamp()),
431                false,
432            );
433        }
434
435        // 12. Emit MESSAGE_SENT event (production)
436        debug!("Emitting MESSAGE_SENT event");
437        let handler_count = {
438            let rt = self.runtime.read().unwrap();
439            let events = rt.events.read().unwrap();
440            events.get("MESSAGE_SENT").map(|h| h.len()).unwrap_or(0)
441        };
442
443        if handler_count > 0 {
444            debug!("Would invoke {} MESSAGE_SENT event handlers", handler_count);
445        }
446
447        info!(
448            "✓ Message processing complete - {} response(s) generated and stored",
449            response_memories.len()
450        );
451        let _elapsed = _start.elapsed().as_millis() as i64;
452        span.record("duration_ms", &_elapsed);
453        Ok(response_memories)
454    }
455
456    /// Determine if agent should respond to message
457    async fn should_respond(&self, message: &Memory, room: &Room) -> Result<bool> {
458        // Respond to DMs always, otherwise use mention/intent + active-thread TTL
459        match room.channel_type {
460            ChannelType::Dm | ChannelType::VoiceDm | ChannelType::Api => Ok(true),
461            _ => {
462                let addressed = message
463                    .content
464                    .metadata
465                    .get("addressed_to_me")
466                    .and_then(|v| v.as_bool())
467                    .unwrap_or(false);
468                if addressed {
469                    return Ok(true);
470                }
471                let rt = self.runtime.read().unwrap();
472                let agent_name = &rt.character.name;
473                let text_lc = message.content.text.to_lowercase();
474                let mentioned = text_lc.contains(&agent_name.to_lowercase());
475                let intent_directive = text_lc.starts_with("please ")
476                    || text_lc.starts_with("can you")
477                    || text_lc.contains("help me")
478                    || text_lc.contains("what is")
479                    || text_lc.contains("how do");
480                let ttl_ok = {
481                    let key = format!("ui:lastAddressed:{}", message.room_id);
482                    if let Some(ts) = rt.get_setting(&key).and_then(|v| v.as_i64()) {
483                        let now = chrono::Utc::now().timestamp();
484                        let elapsed = now - ts;
485                        elapsed <= 600 // 10 minutes
486                    } else {
487                        false
488                    }
489                };
490                Ok(mentioned || intent_directive || ttl_ok)
491            }
492        }
493    }
494
495    /// Generate response text using LLM (supports OpenAI, Anthropic, Local LLM)
496    async fn generate_response(&self, message: &Memory, state: &State) -> Result<String> {
497        // Compose prompt from state using template
498        let template_owned = {
499            let rt = self.runtime.read().unwrap();
500            if let Some(ref templates) = rt.character.templates {
501                templates.message_handler_template.clone()
502            } else {
503                None
504            }
505        };
506        let template_str = template_owned
507            .as_deref()
508            .unwrap_or(crate::templates::MESSAGE_HANDLER_TEMPLATE);
509        let mut prompt = compose_prompt_from_state(&state, template_str).unwrap_or_else(|_| {
510            // Fallback template if state composition fails
511            format!(
512                "You are ZoeyBot, a helpful AI assistant.\n\
513                        User message: {}\n\
514                        Respond helpfully in XML format with <thought> and <text> tags.",
515                message.content.text
516            )
517        });
518
519        // Template already contains current message via providers; avoid redundant suffix
520
521        let streaming_enabled = {
522            let rt = self.runtime.read().unwrap();
523            rt.get_setting("ui:streaming")
524                .and_then(|v| v.as_bool())
525                .unwrap_or_else(|| {
526                    std::env::var("UI_STREAMING")
527                        .map(|v| v.eq_ignore_ascii_case("true"))
528                        .unwrap_or(false)
529                })
530        };
531
532        // Adaptive temperature based on simple intent heuristics
533        {
534            let text_lc = message.content.text.to_lowercase();
535            let factual = text_lc.contains('?')
536                || text_lc.starts_with("what")
537                || text_lc.starts_with("how")
538                || text_lc.starts_with("why")
539                || text_lc.starts_with("when")
540                || text_lc.starts_with("where");
541            let creative = text_lc.contains("brainstorm")
542                || text_lc.contains("ideas")
543                || text_lc.contains("suggestions")
544                || text_lc.contains("think of");
545            let target_temp = if factual {
546                0.4
547            } else if creative {
548                0.8
549            } else {
550                0.7
551            };
552            let mut rt = self.runtime.write().unwrap();
553            rt.set_setting("ui:temperature", serde_json::json!(target_temp), false);
554        }
555
556        // Optional prompt debug (disabled by default)
557        let prompt_debug = {
558            let rt = self.runtime.read().unwrap();
559            rt.get_setting("ui:prompt_debug")
560                .and_then(|v| v.as_bool())
561                .unwrap_or_else(|| {
562                    std::env::var("UI_PROMPT_DEBUG")
563                        .map(|v| v.eq_ignore_ascii_case("true"))
564                        .unwrap_or(false)
565                })
566        };
567        if prompt_debug {
568            debug!("╔════════════════════════════════════════════════════════════════");
569            debug!("║ LLM PROMPT CONTEXT ({} chars)", prompt.len());
570            debug!("╠════════════════════════════════════════════════════════════════");
571            for (i, line) in prompt.lines().take(50).enumerate() {
572                debug!("║ {:3} │ {}", i + 1, line);
573            }
574            if prompt.lines().count() > 50 {
575                debug!("║ ... ({} more lines)", prompt.lines().count() - 50);
576            }
577            debug!("╚════════════════════════════════════════════════════════════════");
578        }
579
580        // Try to use registered model handlers (OpenAI, Anthropic, LocalLLM)
581        // Handlers are priority-sorted: Local (200) > Cloud (100)
582        let raw_response = self.call_llm(&prompt).await?;
583
584        info!("LLM response received ({} chars)", raw_response.len());
585
586        // Parse XML response to extract thought and text
587        let (thought, response_text_raw) = self.parse_llm_response(&raw_response);
588        let mut response_text = {
589            use regex::Regex;
590            let mut t = response_text_raw.clone();
591            let action_re =
592                Regex::new(r"(?i)^\s*(REPLY|SEND_MESSAGE|IGNORE|NONE)\b[:\-]?\s*").unwrap();
593            t = action_re.replace_all(&t, "").to_string();
594            let action_line_re =
595                Regex::new(r"(?mi)^\s*(REPLY|SEND_MESSAGE|IGNORE|NONE)\b[:\-]?.*$\n?").unwrap();
596            t = action_line_re.replace_all(&t, "").to_string();
597            let html_re = Regex::new(r"(?is)</?[^>]+>").unwrap();
598            t = html_re.replace_all(&t, "").to_string();
599            // Normalize double newlines introduced by character guidelines to single newlines in user-facing text
600            let dbl_nl = Regex::new(r"\n\n+").unwrap();
601            t = dbl_nl.replace_all(&t, "\n").to_string();
602            let meta_re = Regex::new(
603                r"(?mi)^\s*(The user .*|As an AI.*|I will .*|I can .* assist.*)\.?\s*$\n?",
604            )
605            .unwrap();
606            t = meta_re.replace_all(&t, "").to_string();
607            let leading_punct = Regex::new(r"^[\s,.;:!\-]+").unwrap();
608            t = leading_punct.replace_all(&t, "").to_string();
609            t.trim().to_string()
610        };
611
612        // Post-completion continuation: if the final character is not terminal punctuation, request a short continuation
613        let looks_truncated = {
614            let s = response_text.trim_end();
615            if s.len() < 80 {
616                false
617            } else {
618                match s.chars().rev().find(|c| !c.is_whitespace()) {
619                    Some(c) => {
620                        let enders = ['.', '!', '?', '”', '’', '"', '\'', ')', ']', '}'];
621                        !enders.contains(&c)
622                    }
623                    None => false,
624                }
625            }
626        };
627        if looks_truncated {
628            if streaming_enabled {
629                if let Ok(cont) = self.continue_response(&response_text).await {
630                    if !cont.is_empty() {
631                        response_text = format!("{} {}", response_text, cont);
632                    }
633                }
634            } else {
635                // Non-streaming (e.g., Discord): finish the sentence locally without a second LLM call
636                let trimmed = response_text.trim_end();
637                response_text = format!("{}.", trimmed);
638            }
639        }
640
641        // Store thought for future use (learning/reflection/training)
642        if let Some(ref thought_text) = thought {
643            if let Some(ref collector) = self.training_collector {
644                // Use training collector to store thought
645                use crate::runtime_ref::RuntimeRef;
646                let runtime_ref = Arc::new(RuntimeRef::new(&self.runtime));
647                let runtime_any = runtime_ref.as_any_arc();
648
649                let quality_score = 0.7; // Default quality for thoughts
650                match collector
651                    .store_thought(runtime_any, thought_text, message, quality_score)
652                    .await
653                {
654                    Ok(id) => debug!("Stored thought with ID: {}", id),
655                    Err(e) => warn!("Failed to store thought: {}", e),
656                }
657            } else {
658                // Fallback: store directly (old method)
659                self.store_thought_direct(&thought_text, message).await?;
660            }
661        }
662
663        Ok(response_text)
664    }
665
666    async fn continue_response(&self, prev_text: &str) -> Result<String> {
667        let prompt = format!(
668            "Continue the assistant's previous response naturally. Do not repeat content.\n\nPrevious:\n{}\n\nRespond in XML format:\n<response>\n<thought></thought>\n<actions>REPLY</actions>\n<text>",
669            prev_text
670        );
671        let raw = self.call_llm(&prompt).await?;
672        let (_, text) = self.parse_llm_response(&raw);
673        let cleaned = {
674            use regex::Regex;
675            let mut t = text;
676            let html_re = Regex::new(r"(?is)</?[^>]+>").unwrap();
677            t = html_re.replace_all(&t, "").to_string();
678            let action_re =
679                Regex::new(r"(?i)^\s*(REPLY|SEND_MESSAGE|IGNORE|NONE)\b[:\-]?\s*").unwrap();
680            t = action_re.replace_all(&t, "").to_string();
681            let leading = Regex::new(r"^[\s,.;:!\-]+").unwrap();
682            t = leading.replace_all(&t, "").to_string();
683            t.trim().to_string()
684        };
685        Ok(cleaned)
686    }
687
688    /// Call LLM using available providers (OpenAI, Anthropic, or Local)
689    async fn call_llm(&self, prompt: &str) -> Result<String> {
690        static COST_CALC: OnceLock<crate::planner::cost::CostCalculator> = OnceLock::new();
691        // Streaming support: when ui:streaming is enabled and a streaming-capable provider is selected,
692        // we will stream tokens into a buffer and return the final text.
693        // Get configured provider preference and available handlers
694        let (preferred_provider, model_handlers) = {
695            let rt = self.runtime.read().unwrap();
696            let models = rt.models.read().unwrap();
697
698            // Get user's preferred provider from character settings
699            let provider_pref = rt
700                .get_setting("model_provider")
701                .and_then(|v| v.as_str().map(|s| s.to_string()));
702
703            // Log available providers for debugging
704            for (model_type, handlers) in models.iter() {
705                debug!(
706                    "Available model type '{}': {} handler(s)",
707                    model_type,
708                    handlers.len()
709                );
710                for (idx, handler) in handlers.iter().enumerate() {
711                    debug!(
712                        "  [{}] {} (priority: {})",
713                        idx, handler.name, handler.priority
714                    );
715                }
716            }
717
718            (provider_pref, models.get("TEXT_LARGE").cloned())
719        };
720
721        if let Some(handlers) = model_handlers {
722            // If user specified a provider, try to find it
723            let provider = if let Some(pref) = preferred_provider.as_ref() {
724                info!("🎯 Looking for preferred provider: {}", pref);
725
726                // Normalize provider aliases for matching
727                // "ollama" and "local" should map to "local-llm"
728                let pref_lc = pref.to_lowercase();
729                let is_local_alias = matches!(pref_lc.as_str(), "ollama" | "local" | "llama" | "llamacpp" | "localai");
730
731                // Try to find matching provider
732                let matching = handlers.iter().find(|h| {
733                    let h_lc = h.name.to_lowercase();
734                    // Direct match
735                    h_lc.contains(&pref_lc) || pref_lc.contains(&h_lc)
736                    // Local provider alias matching
737                    || (is_local_alias && (h_lc.contains("local") || h_lc.contains("llm")))
738                });
739
740                if let Some(matched) = matching {
741                    info!("✓ Found matching provider: {}", matched.name);
742                    Some(matched.clone())
743                } else {
744                    warn!(
745                        "⚠️  Preferred provider '{}' not found, using highest priority",
746                        pref
747                    );
748                    handlers.first().cloned()
749                }
750            } else {
751                // No preference, use highest priority
752                handlers.first().cloned()
753            };
754
755            let do_race = {
756                let rt = self.runtime.read().unwrap();
757                let race_setting = rt
758                    .get_setting("ui:provider_racing")
759                    .and_then(|v| v.as_bool())
760                    .unwrap_or_else(|| {
761                        std::env::var("UI_PROVIDER_RACING")
762                            .map(|v| v.eq_ignore_ascii_case("true"))
763                            .unwrap_or(false)
764                    });
765                let streaming_ctx = rt
766                    .get_setting("ui:streaming")
767                    .and_then(|v| v.as_bool())
768                    .unwrap_or(false);
769                race_setting && streaming_ctx && preferred_provider.is_none() && handlers.len() > 1
770            };
771
772            if do_race {
773                use tokio::task::JoinSet;
774                let mut js: JoinSet<Result<String>> = JoinSet::new();
775                let max_candidates = 3usize.min(handlers.len());
776                for p in handlers.iter().take(max_candidates) {
777                    let name_lc = p.name.to_lowercase();
778                    let (preferred_model, temp, max_tokens) = {
779                        let rt = self.runtime.read().unwrap();
780                        let model = if name_lc.contains("openai") {
781                            rt.get_setting("OPENAI_MODEL")
782                                .and_then(|v| v.as_str().map(|s| s.to_string()))
783                        } else if name_lc.contains("anthropic") || name_lc.contains("claude") {
784                            rt.get_setting("ANTHROPIC_MODEL")
785                                .and_then(|v| v.as_str().map(|s| s.to_string()))
786                        } else {
787                            rt.get_setting("LOCAL_LLM_MODEL")
788                                .and_then(|v| v.as_str().map(|s| s.to_string()))
789                        };
790                        let temp = rt
791                            .get_setting("ui:temperature")
792                            .and_then(|v| v.as_f64().map(|f| f as f32))
793                            .or_else(|| {
794                                rt.get_setting("temperature")
795                                    .and_then(|v| v.as_f64().map(|f| f as f32))
796                            })
797                            .unwrap_or(0.7);
798                        let base_tokens = if name_lc.contains("openai")
799                            || name_lc.contains("anthropic")
800                            || name_lc.contains("claude")
801                        {
802                            rt.get_setting("max_tokens")
803                                .and_then(|v| v.as_u64().map(|u| u as usize))
804                                .unwrap_or(150)
805                        } else {
806                            rt.get_setting("LOCAL_LLM_MAX_TOKENS")
807                                .and_then(|v| v.as_u64().map(|u| u as usize))
808                                .or_else(|| {
809                                    rt.get_setting("max_tokens")
810                                        .and_then(|v| v.as_u64().map(|u| u as usize))
811                                })
812                                .unwrap_or(150)
813                        };
814                        (model, temp, base_tokens)
815                    };
816                    let params = GenerateTextParams {
817                        prompt: prompt.to_string(),
818                        max_tokens: Some(max_tokens),
819                        temperature: Some(temp),
820                        top_p: None,
821                        stop: None,
822                        model: preferred_model,
823                        frequency_penalty: None,
824                        presence_penalty: None,
825                    };
826                    let mh_params = ModelHandlerParams {
827                        runtime: Arc::new(()),
828                        params,
829                    };
830                    js.spawn((p.handler)(mh_params));
831                }
832                while let Some(res) = js.join_next().await {
833                    if let Ok(Ok(text)) = res {
834                        return Ok(text);
835                    }
836                }
837            } else if let Some(provider) = provider {
838                info!(
839                    "🤖 Using LLM provider: {} (priority: {})",
840                    provider.name, provider.priority
841                );
842
843                // Get model preferences from runtime settings based on provider
844                let (preferred_model, temp, max_tokens) = {
845                    let rt = self.runtime.read().unwrap();
846
847                    // Select model based on provider name
848                    let model = if provider.name.to_lowercase().contains("openai") {
849                        rt.get_setting("OPENAI_MODEL")
850                            .and_then(|v| v.as_str().map(|s| s.to_string()))
851                            .or_else(|| {
852                                rt.get_setting("openai_model")
853                                    .and_then(|v| v.as_str().map(|s| s.to_string()))
854                            })
855                    } else if provider.name.to_lowercase().contains("anthropic")
856                        || provider.name.to_lowercase().contains("claude")
857                    {
858                        rt.get_setting("ANTHROPIC_MODEL")
859                            .and_then(|v| v.as_str().map(|s| s.to_string()))
860                            .or_else(|| {
861                                rt.get_setting("anthropic_model")
862                                    .and_then(|v| v.as_str().map(|s| s.to_string()))
863                            })
864                    } else {
865                        // Local LLM
866                        rt.get_setting("LOCAL_LLM_MODEL")
867                            .and_then(|v| v.as_str().map(|s| s.to_string()))
868                            .or_else(|| {
869                                rt.get_setting("local_llm_model")
870                                    .and_then(|v| v.as_str().map(|s| s.to_string()))
871                            })
872                    };
873
874                    // Prefer ui:temperature if set by adaptive heuristics; fallback to generic temperature
875                    let temp = rt
876                        .get_setting("ui:temperature")
877                        .and_then(|v| v.as_f64().map(|f| f as f32))
878                        .or_else(|| {
879                            rt.get_setting("temperature")
880                                .and_then(|v| v.as_f64().map(|f| f as f32))
881                        })
882                        .unwrap_or(0.7);
883                    let base_tokens = if provider.name.to_lowercase().contains("openai")
884                        || provider.name.to_lowercase().contains("anthropic")
885                        || provider.name.to_lowercase().contains("claude")
886                    {
887                        rt.get_setting("max_tokens")
888                            .and_then(|v| v.as_u64().map(|u| u as usize))
889                            .unwrap_or(150)
890                    } else {
891                        rt.get_setting("LOCAL_LLM_MAX_TOKENS")
892                            .and_then(|v| v.as_u64().map(|u| u as usize))
893                            .or_else(|| {
894                                rt.get_setting("max_tokens")
895                                    .and_then(|v| v.as_u64().map(|u| u as usize))
896                            })
897                            .unwrap_or(150)
898                    };
899                    let mut tokens = {
900                        let v = rt.get_setting("ui:verbosity");
901                        if let Some(val) = v {
902                            if let Some(n) = val.as_u64() {
903                                n as usize
904                            } else if let Some(s) = val.as_str() {
905                                match s.to_lowercase().as_str() {
906                                    "short" => ((base_tokens as f64 * 0.6) as usize).max(32),
907                                    "normal" => base_tokens,
908                                    "long" | "verbose" => {
909                                        ((base_tokens as f64 * 1.5) as usize).min(base_tokens * 2)
910                                    }
911                                    _ => base_tokens,
912                                }
913                            } else {
914                                base_tokens
915                            }
916                        } else {
917                            base_tokens
918                        }
919                    };
920
921                    // Dynamically adjust tokens to avoid output cut-off using model pricing and prompt size
922                    // Apply ONLY when streaming is enabled to avoid delaying first response in non-streaming contexts
923                    let streaming_ctx = rt
924                        .get_setting("ui:streaming")
925                        .and_then(|v| v.as_bool())
926                        .unwrap_or(false);
927                    let avoid_cutoff = rt
928                        .get_setting("ui:avoid_cutoff")
929                        .and_then(|v| v.as_bool())
930                        .unwrap_or(true);
931                    if avoid_cutoff {
932                        use crate::planner::tokens::TokenCounter;
933                        let calc = COST_CALC.get_or_init(crate::planner::cost::CostCalculator::new);
934                        // Determine model key for pricing lookup
935                        let model_key = model.clone().map(|m| m.to_string()).or_else(|| {
936                            let name = provider.name.to_lowercase();
937                            if name.contains("openai") {
938                                Some("gpt-4o".to_string())
939                            } else if name.contains("anthropic") || name.contains("claude") {
940                                Some("claude-3.5-sonnet".to_string())
941                            } else {
942                                Some("local".to_string())
943                            }
944                        });
945                        if let Some(model_name) = model_key.as_ref() {
946                            if let Some(pricing) = calc.get_pricing(model_name) {
947                                if streaming_ctx {
948                                    // Favor maximal output budget to avoid truncation in streaming
949                                    tokens = pricing.max_output_tokens.max(tokens).max(256);
950                                } else {
951                                    // Non-streaming: raise floor without prompt estimation to avoid latency
952                                    tokens = tokens.max(pricing.max_output_tokens).max(512);
953                                }
954                            } else {
955                                // Conservative defaults
956                                tokens = if streaming_ctx {
957                                    tokens.max(2048)
958                                } else {
959                                    tokens.max(1024)
960                                };
961                            }
962                        }
963                    }
964                    (model, temp, tokens)
965                };
966
967                let params = GenerateTextParams {
968                    prompt: prompt.to_string(),
969                    max_tokens: Some(max_tokens),
970                    temperature: Some(temp),
971                    top_p: None,
972                    stop: None,
973                    model: preferred_model, // Pass model from settings!
974                    frequency_penalty: None,
975                    presence_penalty: None,
976                };
977
978                let model_params = ModelHandlerParams {
979                    runtime: Arc::new(()),
980                    params,
981                };
982
983                debug!(
984                    "Calling model handler: {} (temp: {}, max_tokens: {})",
985                    provider.name, temp, max_tokens
986                );
987
988                // Check if streaming is enabled
989                let streaming_enabled = {
990                    let rt = self.runtime.read().unwrap();
991                    rt.get_setting("ui:streaming")
992                        .and_then(|v| v.as_bool())
993                        .unwrap_or(false)
994                };
995
996                if streaming_enabled && !provider.name.to_lowercase().contains("openai") {
997                    // Use local streaming via Ollama if possible
998                    match self.call_ollama_direct(prompt).await {
999                        Ok(text) => return Ok(text),
1000                        Err(e) => {
1001                            warn!(
1002                                "Streaming call failed: {}. Falling back to non-streaming.",
1003                                e
1004                            );
1005                        }
1006                    }
1007                }
1008
1009                match (provider.handler)(model_params).await {
1010                    Ok(text) => {
1011                        info!(
1012                            "✓ LLM response received via {} ({} chars)",
1013                            provider.name,
1014                            text.len()
1015                        );
1016                        debug!("╔════════════════════════════════════════════════════════════════");
1017                        debug!(
1018                            "║ LLM RESPONSE from {} ({} chars)",
1019                            provider.name,
1020                            text.len()
1021                        );
1022                        debug!("╠════════════════════════════════════════════════════════════════");
1023                        for (i, line) in text.lines().take(20).enumerate() {
1024                            debug!("║ {:3} │ {}", i + 1, line);
1025                        }
1026                        if text.lines().count() > 20 {
1027                            debug!("║ ... ({} more lines)", text.lines().count() - 20);
1028                        }
1029                        debug!("╚════════════════════════════════════════════════════════════════");
1030                        return Ok(text);
1031                    }
1032                    Err(e) => {
1033                        warn!("⚠️  Model handler {} failed: {}", provider.name, e);
1034                        warn!("Trying fallback method...");
1035                    }
1036                }
1037            } else {
1038                warn!("No model handlers registered for TEXT_LARGE");
1039            }
1040        } else {
1041            warn!("No model handlers found in registry");
1042        }
1043
1044        // Fallback behavior: only use Ollama if local provider is preferred or no preference
1045        match preferred_provider.as_ref().map(|s| s.to_lowercase()) {
1046            Some(pref)
1047                if pref.contains("local") || pref.contains("ollama") || pref.contains("llama") =>
1048            {
1049                warn!(
1050                    "Falling back to direct Ollama call (preferred provider: {})",
1051                    pref
1052                );
1053                self.call_ollama_direct(prompt).await
1054            }
1055            Some(pref) => {
1056                // Graceful fallback when preferred provider is unavailable: return a minimal XML response
1057                let safe_reply = "<response><thought>Fallback local reasoning</thought><actions>REPLY</actions><text>Okay.</text></response>";
1058                Ok(safe_reply.to_string())
1059            }
1060            None => {
1061                // No preference: attempt local fallback
1062                warn!("Falling back to direct Ollama call (no preferred provider set)");
1063                self.call_ollama_direct(prompt).await
1064            }
1065        }
1066    }
1067
1068    /// Generate response as a stream of text chunks
1069    pub async fn generate_response_stream(
1070        &self,
1071        message: &Memory,
1072        state: &State,
1073    ) -> Result<TextStream> {
1074        let (sender, receiver) = create_text_stream(64);
1075        let handler = StreamHandler::new(sender);
1076
1077        let final_text = self.generate_response(message, state).await?;
1078
1079        tokio::spawn(async move {
1080            let mut idx = 0usize;
1081            let chunk_size = 200usize;
1082            while idx < final_text.len() {
1083                let end = (idx + chunk_size).min(final_text.len());
1084                let piece = final_text[idx..end].to_string();
1085                let is_final = end >= final_text.len();
1086                if handler.send_chunk(piece, is_final).await.is_err() {
1087                    break;
1088                }
1089                idx = end;
1090                if !is_final {
1091                    tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1092                }
1093            }
1094        });
1095
1096        Ok(receiver)
1097    }
1098
1099    /// Direct Ollama API call (fallback if no plugins registered)
1100    async fn call_ollama_direct(&self, prompt: &str) -> Result<String> {
1101        let (model_name, model_endpoint) = {
1102            let rt = self.runtime.read().unwrap();
1103            let model = rt
1104                .get_setting("LOCAL_LLM_MODEL")
1105                .and_then(|v| v.as_str().map(|s| s.to_string()))
1106                .unwrap_or_else(|| "phi3:mini".to_string());
1107
1108            let endpoint = rt
1109                .get_setting("LOCAL_LLM_ENDPOINT")
1110                .and_then(|v| v.as_str().map(|s| s.to_string()))
1111                .unwrap_or_else(|| "http://localhost:11434".to_string());
1112
1113            (model, endpoint)
1114        };
1115
1116        let client = reqwest::Client::new();
1117        let num_predict = {
1118            let rt = self.runtime.read().unwrap();
1119            rt.get_setting("LOCAL_LLM_MAX_TOKENS")
1120                .and_then(|v| v.as_u64().map(|u| u as usize))
1121                .or_else(|| {
1122                    rt.get_setting("local_llm_num_predict")
1123                        .and_then(|v| v.as_u64().map(|u| u as usize))
1124                })
1125                .or_else(|| {
1126                    rt.get_setting("max_tokens")
1127                        .and_then(|v| v.as_u64().map(|u| u as usize))
1128                })
1129                .unwrap_or(400)
1130        };
1131        let ollama_request = serde_json::json!({
1132            "model": model_name,
1133            "prompt": prompt,
1134            "stream": false,
1135            "options": {
1136                "temperature": 0.7,
1137                "num_predict": num_predict
1138            }
1139        });
1140
1141        debug!("Direct Ollama call: {} at {}", model_name, model_endpoint);
1142
1143        // Use a short timeout (5 seconds) to avoid hanging when Ollama is not available
1144        match client
1145            .post(format!("{}/api/generate", model_endpoint))
1146            .json(&ollama_request)
1147            .timeout(std::time::Duration::from_secs(5))
1148            .send()
1149            .await
1150        {
1151            Ok(response) => {
1152                if let Ok(json) = response.json::<serde_json::Value>().await {
1153                    if let Some(text) = json["response"].as_str() {
1154                        return Ok(text.to_string());
1155                    }
1156                }
1157            }
1158            Err(e) => {
1159                return Err(ZoeyError::model(format!("Ollama call failed: {}", e)));
1160            }
1161        }
1162
1163        Err(ZoeyError::model("No LLM response received"))
1164    }
1165
1166    /// Store agent's thought process for learning and reflection (direct method - deprecated)
1167    ///
1168    /// Use TrainingCollector.store_thought() instead for better training data management
1169    async fn store_thought_direct(
1170        &self,
1171        thought_text: &str,
1172        original_message: &Memory,
1173    ) -> Result<()> {
1174        info!(
1175            "💭 Agent thought (direct): {}",
1176            thought_text.chars().take(100).collect::<String>()
1177        );
1178
1179        let agent_id = self.runtime.read().unwrap().agent_id;
1180
1181        // Create thought memory with rich metadata
1182        let thought_memory = Memory {
1183            id: uuid::Uuid::new_v4(),
1184            entity_id: agent_id,
1185            agent_id,
1186            room_id: original_message.room_id,
1187            content: Content {
1188                text: thought_text.to_string(),
1189                source: Some("internal_thought".to_string()),
1190                thought: Some(thought_text.to_string()),
1191                ..Default::default()
1192            },
1193            embedding: None,
1194            metadata: Some(MemoryMetadata {
1195                memory_type: Some("thought".to_string()),
1196                entity_name: Some("ZoeyBot".to_string()),
1197                data: {
1198                    let mut meta = std::collections::HashMap::new();
1199                    meta.insert("purpose".to_string(), serde_json::json!("reflection"));
1200                    meta.insert(
1201                        "related_message".to_string(),
1202                        serde_json::json!(original_message.id.to_string()),
1203                    );
1204                    meta.insert(
1205                        "timestamp".to_string(),
1206                        serde_json::json!(chrono::Utc::now().timestamp_millis()),
1207                    );
1208                    meta.insert(
1209                        "can_be_used_for".to_string(),
1210                        serde_json::json!([
1211                            "decision_pattern_analysis",
1212                            "response_improvement",
1213                            "self_reflection",
1214                            "training_data"
1215                        ]),
1216                    );
1217                    meta
1218                },
1219            }),
1220            created_at: chrono::Utc::now().timestamp_millis(),
1221            unique: Some(false),
1222            similarity: None,
1223        };
1224
1225        // Store thought in dedicated thoughts table
1226        let adapter_opt = self.runtime.read().unwrap().adapter.read().unwrap().clone();
1227        if let Some(adapter) = adapter_opt.as_ref() {
1228            match adapter.create_memory(&thought_memory, "thoughts").await {
1229                Ok(id) => {
1230                    debug!("✓ Thought stored with ID: {}", id);
1231                    info!("💾 Stored for: pattern analysis, improvement, reflection, training");
1232                }
1233                Err(e) => warn!("Failed to store thought: {}", e),
1234            }
1235        }
1236
1237        Ok(())
1238    }
1239
1240    /// Parse LLM response to extract thought and text
1241    fn parse_llm_response(&self, raw_response: &str) -> (Option<String>, String) {
1242        // Prefer <text> extraction within XML; also capture optional <actions>
1243        // Tolerate varied whitespace and order by using regex captures
1244        let re = regex::Regex::new(r"(?is)<response[^>]*>.*?(?:<thought>\s*(.*?)\s*</thought>)?.*?(?:<actions>\s*(.*?)\s*</actions>)?.*?<text>\s*(.*?)\s*</text>.*?</response>").unwrap();
1245        if let Some(caps) = re.captures(raw_response) {
1246            let mut thought = caps.get(1).map(|m| m.as_str().trim().to_string());
1247            let actions_match = caps.get(2).map(|m| m.as_str());
1248            let text = caps.get(3).map(|m| m.as_str()).unwrap_or("");
1249            if let Some(actions_match) = actions_match {
1250                let parsed: Vec<String> = actions_match
1251                    .split(',')
1252                    .map(|s| s.trim().to_string())
1253                    .filter(|s| !s.is_empty())
1254                    .collect();
1255                if !parsed.is_empty() {
1256                    let mut rt = self.runtime.write().unwrap();
1257                    rt.set_setting("ui:lastParsedActions", serde_json::json!(parsed), false);
1258                }
1259            }
1260            if thought.is_none() {
1261                if let Some(thought_start) = raw_response.find("<thought>") {
1262                    if let Some(thought_end) = raw_response.find("</thought>") {
1263                        thought = Some(
1264                            raw_response[thought_start + 9..thought_end]
1265                                .trim()
1266                                .to_string(),
1267                        );
1268                    }
1269                }
1270            }
1271            return (thought, text.trim().to_string());
1272        }
1273
1274        if let Some(text_start) = raw_response.find("<text>") {
1275            if let Some(text_end) = raw_response.find("</text>") {
1276                let text = &raw_response[text_start + 6..text_end];
1277                let thought = if let Some(thought_start) = raw_response.find("<thought>") {
1278                    if let Some(thought_end) = raw_response.find("</thought>") {
1279                        Some(
1280                            raw_response[thought_start + 9..thought_end]
1281                                .trim()
1282                                .to_string(),
1283                        )
1284                    } else {
1285                        None
1286                    }
1287                } else {
1288                    None
1289                };
1290                // Optional actions tag parsing (comma-separated)
1291                if let Some(actions_start) = raw_response.find("<actions>") {
1292                    if let Some(actions_end) = raw_response.find("</actions>") {
1293                        let actions_str = raw_response[actions_start + 9..actions_end].trim();
1294                        let parsed: Vec<String> = actions_str
1295                            .split(',')
1296                            .map(|s| s.trim().to_string())
1297                            .filter(|s| !s.is_empty())
1298                            .collect();
1299                        if !parsed.is_empty() {
1300                            // Store parsed actions hint for downstream execution
1301                            let mut rt = self.runtime.write().unwrap();
1302                            rt.set_setting(
1303                                "ui:lastParsedActions",
1304                                serde_json::json!(parsed),
1305                                false,
1306                            );
1307                        }
1308                    }
1309                }
1310                return (thought, text.trim().to_string());
1311            }
1312        }
1313        let cleaned = regex::Regex::new("(?s)</?[^>]+>")
1314            .unwrap()
1315            .replace_all(raw_response, "")
1316            .to_string()
1317            .trim()
1318            .to_string();
1319        let final_text = if cleaned.is_empty() {
1320            raw_response.trim().to_string()
1321        } else {
1322            cleaned
1323        };
1324        (None, final_text)
1325    }
1326
1327    /// Process actions for the message
1328    async fn process_actions(&self, message: &Memory, state: &State) -> Result<Vec<ActionResult>> {
1329        let rt = self.runtime.read().unwrap();
1330        let actions = rt.actions.read().unwrap();
1331
1332        // Create a dummy runtime reference
1333        let runtime_ref: Arc<dyn std::any::Any + Send + Sync> = Arc::new(());
1334
1335        // Execute REPLY first if valid, then allow additional planned actions hinted by <actions>
1336        let mut results: Vec<ActionResult> = Vec::new();
1337        if let Some(reply) = actions.iter().find(|a| a.name() == "REPLY") {
1338            match reply.validate(runtime_ref.clone(), message, state).await {
1339                Ok(true) => {
1340                    debug!("Executing action: REPLY");
1341                    if let Ok(Some(result)) = reply
1342                        .handler(runtime_ref.clone(), message, state, None, None)
1343                        .await
1344                    {
1345                        results.push(result);
1346                    }
1347                }
1348                Ok(false) => debug!("REPLY action validation failed"),
1349                Err(e) => warn!("Action REPLY validate error: {}", e),
1350            }
1351        }
1352
1353        // Optional additional actions from settings populated by parse_llm_response
1354        let planned = {
1355            let rt = self.runtime.read().unwrap();
1356            rt.get_setting("ui:lastParsedActions")
1357                .and_then(|v| v.as_array().cloned())
1358                .unwrap_or_default()
1359        };
1360        for name_val in planned {
1361            if let Some(name) = name_val.as_str() {
1362                if let Some(act) = actions.iter().find(|a| a.name().eq_ignore_ascii_case(name)) {
1363                    match act.validate(runtime_ref.clone(), message, state).await {
1364                        Ok(true) => {
1365                            debug!("Executing additional action: {}", act.name());
1366                            match act
1367                                .handler(runtime_ref.clone(), message, state, None, None)
1368                                .await
1369                            {
1370                                Ok(Some(res)) => results.push(res),
1371                                Ok(None) => {}
1372                                Err(e) => warn!("Action {} failed: {}", act.name(), e),
1373                            }
1374                        }
1375                        Ok(false) => debug!("Additional action {} validation failed", act.name()),
1376                        Err(e) => warn!("Action {} validate error: {}", act.name(), e),
1377                    }
1378                }
1379            }
1380        }
1381
1382        Ok(results)
1383    }
1384
1385    /// Run evaluators on the message
1386    async fn evaluate(
1387        &self,
1388        message: &Memory,
1389        state: &State,
1390        did_respond: bool,
1391        responses: &[Memory],
1392    ) -> Result<()> {
1393        let rt = self.runtime.read().unwrap();
1394        let evaluators = rt.evaluators.read().unwrap();
1395
1396        // Create a dummy runtime reference
1397        let runtime_ref: Arc<dyn std::any::Any + Send + Sync> = Arc::new(());
1398
1399        for evaluator in evaluators.iter() {
1400            // Check if should run
1401            let should_run = evaluator.always_run()
1402                || evaluator
1403                    .validate(runtime_ref.clone(), message, state)
1404                    .await
1405                    .unwrap_or(false);
1406
1407            if should_run {
1408                debug!("Running evaluator: {}", evaluator.name());
1409                if let Err(e) = evaluator
1410                    .handler(
1411                        runtime_ref.clone(),
1412                        message,
1413                        state,
1414                        did_respond,
1415                        Some(responses.to_vec()),
1416                    )
1417                    .await
1418                {
1419                    warn!("Evaluator {} failed: {}", evaluator.name(), e);
1420                }
1421            }
1422        }
1423
1424        Ok(())
1425    }
1426
1427    /// Compose state with proper RuntimeRef for providers
1428    async fn compose_state_with_runtime_ref(&self, message: &Memory) -> Result<State> {
1429        use crate::runtime_ref::RuntimeRef;
1430
1431        // Create RuntimeRef from the runtime Arc
1432        let runtime_ref = Arc::new(RuntimeRef::new(&self.runtime));
1433        let runtime_any = runtime_ref.as_any_arc();
1434
1435        let mut state = State::new();
1436
1437        // Get providers; in fast mode, only run essential ones
1438        let (providers, fast_mode) = {
1439            let rt = self.runtime.read().unwrap();
1440            let fast = rt
1441                .get_setting("ui:fast_mode")
1442                .and_then(|v| v.as_bool())
1443                .unwrap_or(false);
1444            let list = rt.providers.read().unwrap().clone();
1445            (list, fast)
1446        };
1447
1448        // Run each provider with proper RuntimeRef
1449        for provider in &providers {
1450            if fast_mode {
1451                // Skip non-essential planning providers to reduce latency
1452                let name = provider.name().to_lowercase();
1453                if name.contains("planner")
1454                    || name.contains("recall")
1455                    || name.contains("session_cues")
1456                {
1457                    continue;
1458                }
1459            }
1460            debug!("Running provider: {}", provider.name());
1461
1462            match provider.get(runtime_any.clone(), message, &state).await {
1463                Ok(result) => {
1464                    let mut has_output = false;
1465
1466                    if let Some(ref text) = result.text {
1467                        state.set_value(provider.name().to_uppercase(), text.clone());
1468                        has_output = true;
1469                    }
1470                    if let Some(values) = result.values {
1471                        for (k, v) in values {
1472                            state.set_value(k, v);
1473                        }
1474                        has_output = true;
1475                    }
1476                    if let Some(ref data) = result.data {
1477                        for (k, v) in data.clone() {
1478                            state.set_data(k, v);
1479                        }
1480                        has_output = true;
1481                    }
1482
1483                    // Log detailed output for planning providers
1484                    if provider.name() == "reaction_planner" || provider.name() == "output_planner"
1485                    {
1486                        if has_output {
1487                            debug!(
1488                                "╔════════════════════════════════════════════════════════════════"
1489                            );
1490                            debug!("║ {} OUTPUT", provider.name().to_uppercase());
1491                            debug!(
1492                                "╠════════════════════════════════════════════════════════════════"
1493                            );
1494
1495                            if let Some(ref text) = result.text {
1496                                for line in text.lines() {
1497                                    debug!("║ {}", line);
1498                                }
1499                            }
1500
1501                            if let Some(ref data) = result.data {
1502                                if let Some(plan_data) = data.values().next() {
1503                                    debug!(
1504                                        "║ Data: {}",
1505                                        serde_json::to_string_pretty(plan_data).unwrap_or_default()
1506                                    );
1507                                }
1508                            }
1509
1510                            debug!(
1511                                "╚════════════════════════════════════════════════════════════════"
1512                            );
1513                        }
1514                    }
1515                }
1516                Err(e) => {
1517                    warn!("Provider {} failed: {}", provider.name(), e);
1518                }
1519            }
1520        }
1521        {
1522            let rt = self.runtime.read().unwrap();
1523            // Inject UI tone/verbosity from settings into state values for template use
1524            if let Some(tone) = rt
1525                .get_setting("ui:tone")
1526                .and_then(|v| v.as_str().map(|s| s.to_string()))
1527            {
1528                state.set_value("UI_TONE", tone);
1529            }
1530            if let Some(verb) = rt.get_setting("ui:verbosity") {
1531                let verb_s = if let Some(s) = verb.as_str() {
1532                    s.to_string()
1533                } else {
1534                    verb.to_string()
1535                };
1536                state.set_value("UI_VERBOSITY", verb_s);
1537            }
1538            if let Some(sug) = rt
1539                .get_setting("ui:suggestedResponseLength")
1540                .and_then(|v| v.as_str().map(|s| s.to_string()))
1541            {
1542                state.set_value("UI_SUGGESTED_RESPONSE_LENGTH", sug);
1543            }
1544            if let Some(shift) = rt
1545                .get_setting("ui:possibleTopicShift")
1546                .and_then(|v| v.as_bool())
1547            {
1548                state.set_value(
1549                    "UI_TOPIC_SHIFT",
1550                    if shift {
1551                        "true".to_string()
1552                    } else {
1553                        "false".to_string()
1554                    },
1555                );
1556            }
1557            let room_prefix = format!("ui:lastThought:{}:", message.room_id);
1558            let last_thoughts = rt
1559                .get_settings_with_prefix(&room_prefix)
1560                .into_iter()
1561                .map(|(_, v)| v)
1562                .collect::<Vec<String>>();
1563            if !last_thoughts.is_empty() {
1564                let summary = last_thoughts.join(" ");
1565                state.set_value("CONTEXT_LAST_THOUGHT", summary);
1566            }
1567            state.set_value("LAST_PROMPT", message.content.text.clone());
1568            if let Some(lang) = rt
1569                .get_setting("ui:language")
1570                .and_then(|v| v.as_str().map(|s| s.to_string()))
1571            {
1572                state.set_value("UI_LANGUAGE", lang);
1573            }
1574            if let Some(intent) = rt
1575                .get_setting("ui:intent")
1576                .and_then(|v| v.as_str().map(|s| s.to_string()))
1577            {
1578                state.set_value("UI_INTENT", intent);
1579            }
1580            if let Some(kw) = rt
1581                .get_setting("ui:keywords")
1582                .and_then(|v| v.as_array().cloned())
1583            {
1584                let joined = kw
1585                    .into_iter()
1586                    .filter_map(|x| x.as_str().map(|s| s.to_string()))
1587                    .collect::<Vec<String>>()
1588                    .join(", ");
1589                state.set_value("UI_KEYWORDS", joined);
1590            }
1591            if let Some(top) = rt
1592                .get_setting("ui:topics")
1593                .and_then(|v| v.as_array().cloned())
1594            {
1595                let joined = top
1596                    .into_iter()
1597                    .filter_map(|x| x.as_str().map(|s| s.to_string()))
1598                    .collect::<Vec<String>>()
1599                    .join(", ");
1600                state.set_value("UI_TOPICS", joined);
1601            }
1602            if let Some(ent) = rt
1603                .get_setting("ui:entities")
1604                .and_then(|v| v.as_array().cloned())
1605            {
1606                let joined = ent
1607                    .into_iter()
1608                    .filter_map(|x| x.as_str().map(|s| s.to_string()))
1609                    .collect::<Vec<String>>()
1610                    .join(", ");
1611                state.set_value("UI_ENTITIES", joined);
1612            }
1613            if let Some(arr) = rt
1614                .get_setting("phase0:agent_candidates")
1615                .and_then(|v| v.as_array().cloned())
1616            {
1617                let joined = arr
1618                    .into_iter()
1619                    .filter_map(|x| x.as_str().map(|s| s.to_string()))
1620                    .collect::<Vec<String>>()
1621                    .join(", ");
1622                state.set_value("UI_AGENT_CANDIDATES", joined);
1623            }
1624            if let Some(comp) = rt
1625                .get_setting("ui:complexity")
1626                .and_then(|v| v.as_object().cloned())
1627            {
1628                if let Some(level) = comp.get("level").and_then(|v| v.as_str()) {
1629                    state.set_value("UI_COMPLEXITY_LEVEL", level.to_string());
1630                }
1631                if let Some(reasoning) = comp.get("reasoning").and_then(|v| v.as_str()) {
1632                    state.set_value("UI_COMPLEXITY_REASONING", reasoning.to_string());
1633                }
1634            }
1635        }
1636
1637        // Emit compaction flag if composing the full prompt would approach the model context window
1638        {
1639            use crate::planner::cost::CostCalculator;
1640            use crate::planner::tokens::TokenCounter;
1641            let calc = CostCalculator::new();
1642            let template = crate::templates::MESSAGE_HANDLER_TEMPLATE;
1643            if let Ok(preview) = crate::templates::compose_prompt_from_state(&state, template) {
1644                let estimated_input = TokenCounter::estimate_tokens(&preview);
1645                let rt = self.runtime.read().unwrap();
1646                let provider_name = rt
1647                    .models
1648                    .read()
1649                    .unwrap()
1650                    .get("TEXT_LARGE")
1651                    .and_then(|v| v.first())
1652                    .map(|h| h.name.clone())
1653                    .unwrap_or_else(|| "local".to_string());
1654                let model_key = if provider_name.to_lowercase().contains("openai") {
1655                    "gpt-4o".to_string()
1656                } else if provider_name.to_lowercase().contains("claude")
1657                    || provider_name.to_lowercase().contains("anthropic")
1658                {
1659                    "claude-3.5-sonnet".to_string()
1660                } else {
1661                    "local".to_string()
1662                };
1663                if let Some(pricing) = calc.get_pricing(&model_key) {
1664                    let compact = estimated_input + 256 > pricing.context_window;
1665                    state.set_value("UI_COMPACT_CONTEXT", if compact { "true" } else { "false" });
1666                }
1667            }
1668        }
1669
1670        Ok(state)
1671    }
1672}
1673
1674#[cfg(test)]
1675mod tests {
1676    use super::*;
1677    use crate::RuntimeOpts;
1678    use regex::Regex;
1679
1680    #[tokio::test]
1681    #[ignore = "Integration test - requires Ollama running at localhost:11434"]
1682    async fn test_message_processor() {
1683        let runtime = crate::AgentRuntime::new(RuntimeOpts::default())
1684            .await
1685            .unwrap();
1686        let processor = MessageProcessor::new(runtime);
1687
1688        let message = Memory {
1689            id: uuid::Uuid::new_v4(),
1690            entity_id: uuid::Uuid::new_v4(),
1691            agent_id: uuid::Uuid::new_v4(),
1692            room_id: uuid::Uuid::new_v4(),
1693            content: Content {
1694                text: "Hello!".to_string(),
1695                ..Default::default()
1696            },
1697            embedding: None,
1698            metadata: None,
1699            created_at: chrono::Utc::now().timestamp(),
1700            unique: None,
1701            similarity: None,
1702        };
1703
1704        let room = Room {
1705            id: message.room_id,
1706            agent_id: Some(message.agent_id),
1707            name: "Test Room".to_string(),
1708            source: "test".to_string(),
1709            channel_type: ChannelType::Dm,
1710            channel_id: None,
1711            server_id: None,
1712            world_id: uuid::Uuid::new_v4(),
1713            metadata: std::collections::HashMap::new(),
1714            created_at: None,
1715        };
1716
1717        let result = processor.process_message(message, room).await;
1718        if let Err(ref e) = result {
1719            eprintln!("Message processing failed: {:?}", e);
1720        }
1721        assert!(result.is_ok());
1722    }
1723
1724    #[test]
1725    fn test_parse_llm_response_xml_variants() {
1726        let runtime = crate::create_mock_runtime();
1727        let proc = MessageProcessor::new(runtime);
1728        let xml = "<response>\n<thought>think</thought>\n<actions>REPLY,ASK_CLARIFY</actions>\n<text>Hello world</text>\n</response>";
1729        let (thought, text) = proc.parse_llm_response(xml);
1730        assert_eq!(thought.as_deref(), Some("think"));
1731        assert_eq!(text, "Hello world");
1732    }
1733
1734    #[test]
1735    fn test_parse_llm_response_xml_spacing() {
1736        let runtime = crate::create_mock_runtime();
1737        let proc = MessageProcessor::new(runtime);
1738        let xml = "<response> <text> spaced </text> </response>";
1739        let (_thought, text) = proc.parse_llm_response(xml);
1740        assert_eq!(text, "spaced");
1741    }
1742
1743    #[test]
1744    fn test_tone_verbosity_injection() {
1745        let mut state = State::new();
1746        state.set_value("UI_TONE", "friendly");
1747        state.set_value("UI_VERBOSITY", "short");
1748        let tpl = "Tone: {{UI_TONE}} Verbosity: {{UI_VERBOSITY}}";
1749        let rendered = crate::templates::compose_prompt_from_state(&state, tpl).unwrap();
1750        assert!(rendered.contains("friendly"));
1751        assert!(rendered.contains("short"));
1752    }
1753
1754    #[test]
1755    fn test_parse_llm_response_malformed_actions() {
1756        let runtime = crate::create_mock_runtime();
1757        let proc = MessageProcessor::new(runtime);
1758        let xml = "<response><text>hello</text><actions> , , REPLY ,, </actions></response>";
1759        let (_thought, text) = proc.parse_llm_response(xml);
1760        assert_eq!(text, "hello");
1761    }
1762
1763    #[test]
1764    fn test_parse_llm_response_missing_wrapper() {
1765        let runtime = crate::create_mock_runtime();
1766        let proc = MessageProcessor::new(runtime);
1767        let xml = "<text>hello</text>";
1768        let (_thought, text) = proc.parse_llm_response(xml);
1769        assert_eq!(text, "hello");
1770    }
1771
1772    #[test]
1773    fn test_parse_llm_response_multiple_text() {
1774        let runtime = crate::create_mock_runtime();
1775        let proc = MessageProcessor::new(runtime);
1776        let xml = "<response><text>first</text><text>second</text></response>";
1777        let (_thought, text) = proc.parse_llm_response(xml);
1778        assert!(text == "first" || text == "second");
1779    }
1780}
1781
1782// Streaming via Ollama will be implemented in provider-specific crates to avoid coupling here.