Skip to main content

kowalski_core/agent/
mod.rs

1use crate::agent::types::StreamResponse;
2use crate::config::Config;
3use crate::conversation::Conversation;
4use crate::conversation::Message;
5use crate::error::KowalskiError;
6use crate::memory::MemoryProvider;
7use crate::memory::MemoryUnit;
8use crate::memory::working::WorkingMemory;
9use crate::role::Role;
10use crate::tools::{ToolCall, ToolOutput};
11use async_trait::async_trait;
12use futures::StreamExt;
13use log::debug;
14use log::info;
15use log::warn;
16use serde_json;
17use serde_json::json;
18use std::any::Any;
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::io::{self, Write};
22use std::time::{SystemTime, UNIX_EPOCH};
23
24pub mod repl_trace;
25pub mod types;
26
27/// The core agent trait that all our specialized agents must implement.
28#[async_trait]
29pub trait Agent: Send + Sync {
30    /// Creates a new agent with the specified configuration.
31    async fn new(config: Config) -> Result<Self, KowalskiError>
32    where
33        Self: Sized;
34
35    /// Starts a new conversation
36    fn start_conversation(&mut self, model: &str) -> String;
37
38    /// Gets a conversation by ID
39    fn get_conversation(&self, id: &str) -> Option<&Conversation>;
40
41    /// Lists all conversations
42    fn list_conversations(&self) -> Vec<&Conversation>;
43
44    /// Deletes a conversation
45    fn delete_conversation(&mut self, id: &str) -> bool;
46
47    /// Chats with history (model messages) for the given conversation.
48    async fn chat_with_history(
49        &mut self,
50        conversation_id: &str,
51        content: &str,
52        role: Option<Role>,
53    ) -> Result<String, KowalskiError>;
54
55    /// Processes a stream response
56    async fn process_stream_response(
57        &mut self,
58        conversation_id: &str,
59        chunk: &[u8],
60    ) -> Result<Option<Message>, KowalskiError>;
61
62    /// Adds a message to a conversation
63    async fn add_message(&mut self, conversation_id: &str, role: &str, content: &str);
64
65    /// Exports a conversation to a JSON string
66    fn export_conversation(&self, id: &str) -> Result<String, KowalskiError>;
67
68    /// Imports a conversation from a JSON string, returns the new conversation ID
69    fn import_conversation(&mut self, json: &str) -> Result<String, KowalskiError>;
70
71    /// Executes a tool with the given name and input.
72    async fn execute_tool(
73        &mut self,
74        _tool_name: &str,
75        _tool_input: &serde_json::Value,
76    ) -> Result<ToolOutput, KowalskiError> {
77        Err(KowalskiError::ToolExecution(
78            "Tool execution not implemented for this agent".to_string(),
79        ))
80    }
81
82    /// Chat with the agent using ReAct-style tool calling
83    async fn chat_with_tools(
84        &mut self,
85        conversation_id: &str,
86        user_input: &str,
87    ) -> Result<String, KowalskiError> {
88        let mut final_response = String::new();
89        let mut current_input = user_input.to_string();
90        let mut iteration_count = 0;
91        const MAX_ITERATIONS: usize = 5; // Prevent infinite loops
92        let mut last_tool_call: Option<(String, serde_json::Value)> = None;
93        let mut tool_parse_hint_sent = false;
94
95        debug!("Starting chat_with_tools for input: '{}'", user_input);
96
97        while iteration_count < MAX_ITERATIONS {
98            iteration_count += 1;
99            debug!(" === ITERATION {} ===", iteration_count);
100            debug!("Current input: '{}'", current_input);
101
102            // Get response from LLM
103            debug!("Calling LLM...");
104            let response_text = self
105                .chat_with_history(conversation_id, &current_input, None)
106                .await?;
107
108            // Print response (simulate streaming effect or just print)
109            if repl_trace::repl_trace_enabled() {
110                println!("[agent] {}", response_text);
111            } else {
112                println!("{}", response_text);
113            }
114            io::stdout()
115                .flush()
116                .map_err(|e| KowalskiError::Server(e.to_string()))?;
117
118            let buffer = response_text.clone();
119            debug!("Full LLM response: '{}'", buffer);
120
121            // Try to extract JSON from mixed text response using robust utility
122            debug!("Attempting to extract tool calls from response...");
123            let tool_calls = crate::utils::json::extract_tool_calls(&buffer);
124
125            if !tool_calls.is_empty() {
126                // For now, we only process the first tool call found in one turn
127                let tool_call = &tool_calls[0];
128
129                // Detect repeated tool calls
130                let tool_call_key = (tool_call.name.clone(), tool_call.parameters.clone());
131                if let Some(last) = &last_tool_call
132                    && *last == tool_call_key
133                {
134                    debug!(
135                        "Detected repeated tool call. Breaking loop to prevent infinite tool call loop."
136                    );
137                    break;
138                }
139                last_tool_call = Some(tool_call_key.clone());
140
141                debug!("✅ Tool call successfully parsed!");
142                debug!("Tool: {}", tool_call.name);
143                debug!("Parameters: {}", tool_call.parameters);
144                debug!("Reasoning: {:?}", tool_call.reasoning);
145
146                if repl_trace::repl_trace_enabled() {
147                    let params = serde_json::to_string(&tool_call.parameters)
148                        .unwrap_or_else(|_| "{}".to_string());
149                    println!("[tool] {} {}", tool_call.name, params);
150                }
151
152                let tool_result = match self
153                    .execute_tool(&tool_call.name, &tool_call.parameters)
154                    .await
155                {
156                    Ok(output) => output.result.to_string(),
157                    Err(e) => {
158                        let err_msg = format!("{}", e);
159                        debug!("Tool execution failed: {}", err_msg);
160
161                        // Basic fallback/chaining logic can be integrated here if needed
162                        err_msg
163                    }
164                };
165
166                let tool_message = format!("Tool result for {}: {}", tool_call.name, tool_result);
167                self.add_message(conversation_id, "assistant", &tool_message)
168                    .await;
169                debug!("Added tool result to conversation");
170
171                current_input = format!("Based on the tool result: {}", tool_result);
172                debug!("Continuing with new input: '{}'", current_input);
173                continue;
174            }
175
176            if crate::utils::json::looks_like_tool_json_attempt(&buffer) && !tool_parse_hint_sent {
177                tool_parse_hint_sent = true;
178                let preview: String = buffer.chars().take(400).collect();
179                let total_chars = buffer.chars().count();
180                warn!(
181                    "Tool call JSON parse failed ({} chars); raw preview: {:?}",
182                    total_chars, preview
183                );
184                self.add_message(conversation_id, "assistant", &buffer)
185                    .await;
186                const HINT: &str = "Your previous reply appeared to include a tool call but it could not be parsed as JSON. Reply with a single JSON object only: {\"name\": \"<tool_name>\", \"parameters\": { ... } } matching the available tools. No markdown fences or extra text.";
187                current_input = HINT.to_string();
188                debug!("Tool JSON parse failed; requesting one self-correction turn");
189                continue;
190            }
191
192            // Not a tool call, this is the final answer
193            final_response = buffer;
194            self.add_message(conversation_id, "assistant", &final_response)
195                .await;
196            debug!("✅ Final response set: '{}'", final_response);
197
198            if let Some(tool_call) = rule_based_tool_call(user_input) {
199                debug!("Rule-based tool call triggered: {:?}", tool_call);
200                let tool_result = self
201                    .execute_tool(&tool_call.name, &tool_call.parameters)
202                    .await;
203                let tool_result_str = match tool_result {
204                    Ok(output) => output.result.to_string(),
205                    Err(e) => format!("Tool execution failed: {}", e),
206                };
207                self.add_message(conversation_id, "assistant", &tool_result_str)
208                    .await;
209                debug!("Rule-based tool result: {}", tool_result_str);
210                return Ok(tool_result_str);
211            }
212
213            break;
214        }
215
216        if iteration_count >= MAX_ITERATIONS {
217            warn!("Reached maximum iterations, returning current response");
218        }
219
220        debug!(
221            "chat_with_tools completed after {} iterations",
222            iteration_count
223        );
224        Ok(final_response)
225    }
226
227    /// Lists tools available to this agent
228    async fn list_tools(&self) -> Vec<(String, String)> {
229        Vec::new()
230    }
231
232    fn name(&self) -> &str;
233
234    /// Gets the agent's description
235    fn description(&self) -> &str;
236
237    fn as_any(&self) -> &dyn Any;
238}
239
240/// The base agent implementation that provides common functionality.
241pub struct BaseAgent {
242    pub client: reqwest::Client,
243    pub config: Config,
244    pub conversations: HashMap<String, Conversation>,
245    pub name: String,
246    pub description: String,
247    pub system_prompt: Option<String>,
248    // LLM Provider
249    pub llm_provider: std::sync::Arc<dyn crate::llm::LLMProvider>,
250    // Memory Tiers - now using dependency injection
251    pub working_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
252    pub episodic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
253    pub semantic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
254    // Tool Manager
255    pub tool_manager: crate::tools::manager::ToolManager,
256}
257
258#[derive(Debug, Clone)]
259pub struct MemoryDebugInfo {
260    pub memory_used: bool,
261    pub memory_source: String,
262    pub memory_items_count: usize,
263}
264
265impl BaseAgent {
266    fn recent_conversation_items(messages: &[Message], max_items: usize) -> Vec<String> {
267        let mut recent: Vec<String> = messages
268            .iter()
269            .rev()
270            .filter(|m| m.role != "system")
271            .take(max_items)
272            .map(|m| format!("[{}] {}", m.role, m.content))
273            .collect();
274        recent.reverse();
275        recent
276    }
277
278    fn recent_conversation_context(messages: &[Message], max_items: usize) -> String {
279        Self::recent_conversation_items(messages, max_items).join("\n---\n")
280    }
281
282    async fn retrieve_memory_items(&self, content: &str, use_memory: bool) -> Vec<MemoryUnit> {
283        if !use_memory {
284            return Vec::new();
285        }
286
287        let working_memories = self
288            .working_memory
289            .lock()
290            .await
291            .retrieve(content, self.config.working_memory_retrieval_limit)
292            .await
293            .unwrap_or_default();
294
295        let episodic_memories = self
296            .episodic_memory
297            .lock()
298            .await
299            .retrieve(content, self.config.episodic_memory_retrieval_limit)
300            .await
301            .unwrap_or_default();
302
303        let semantic_memories = self
304            .semantic_memory
305            .lock()
306            .await
307            .retrieve(content, self.config.semantic_memory_retrieval_limit)
308            .await
309            .unwrap_or_default();
310
311        let mut seen_ids = HashSet::new();
312        let mut all_memories = Vec::new();
313        for m in working_memories
314            .into_iter()
315            .chain(episodic_memories)
316            .chain(semantic_memories)
317        {
318            if seen_ids.insert(m.id.clone()) {
319                all_memories.push(m);
320            }
321        }
322        all_memories
323    }
324
325    async fn build_memory_context(&self, content: &str, use_memory: bool) -> String {
326        let all_memories = self.retrieve_memory_items(content, use_memory).await;
327
328        if all_memories.is_empty() {
329            return String::new();
330        }
331
332        let concatenated_memories = all_memories
333            .iter()
334            .map(|m| m.content.as_str())
335            .collect::<Vec<&str>>()
336            .join("\n---\n");
337        format!(
338            "\n--- Relevant Memories ---\n{}\n--- End Memories ---",
339            concatenated_memories
340        )
341    }
342
343    pub async fn preview_memory_debug(
344        &self,
345        conversation_id: &str,
346        content: &str,
347        use_memory: bool,
348    ) -> MemoryDebugInfo {
349        if !use_memory {
350            return MemoryDebugInfo {
351                memory_used: false,
352                memory_source: "disabled".to_string(),
353                memory_items_count: 0,
354            };
355        }
356        let retrieved = self.retrieve_memory_items(content, true).await;
357        if !retrieved.is_empty() {
358            return MemoryDebugInfo {
359                memory_used: true,
360                memory_source: "retrieved".to_string(),
361                memory_items_count: retrieved.len(),
362            };
363        }
364        let fallback_count = self
365            .conversations
366            .get(conversation_id)
367            .map(|c| Self::recent_conversation_items(&c.messages, 4).len())
368            .unwrap_or(0);
369        if fallback_count > 0 {
370            return MemoryDebugInfo {
371                memory_used: true,
372                memory_source: "fallback".to_string(),
373                memory_items_count: fallback_count,
374            };
375        }
376        MemoryDebugInfo {
377            memory_used: false,
378            memory_source: "none".to_string(),
379            memory_items_count: 0,
380        }
381    }
382
383    #[allow(clippy::too_many_arguments)]
384    pub async fn new(
385        config: Config,
386        name: &str,
387        description: &str,
388        llm_provider: std::sync::Arc<dyn crate::llm::LLMProvider>,
389        working_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
390        episodic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
391        semantic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
392        tool_manager: crate::tools::manager::ToolManager,
393    ) -> Result<Self, KowalskiError> {
394        let client = reqwest::ClientBuilder::new()
395            .http1_only()
396            .pool_max_idle_per_host(0)
397            .build()
398            .map_err(KowalskiError::Request)?;
399
400        info!("BaseAgent created with name: {}", name);
401
402        Ok(Self {
403            client,
404            config,
405            conversations: HashMap::new(),
406            name: name.to_string(),
407            description: description.to_string(),
408            system_prompt: None,
409            llm_provider,
410            working_memory,
411            episodic_memory,
412            semantic_memory,
413            tool_manager,
414        })
415    }
416
417    pub fn set_temperature(&mut self, temperature: f32) {
418        self.config.chat.temperature = temperature;
419    }
420
421    pub fn set_system_prompt(&mut self, prompt: &str) {
422        self.system_prompt = Some(prompt.to_string());
423    }
424
425    /// Same memory + user turn as [`Agent::chat_with_history`], but returns owned messages for
426    /// [`crate::llm::LLMProvider::chat_stream`] without calling the LLM (caller streams, then
427    /// should [`Self::add_message`] with role `assistant` for the full reply).
428    pub async fn prepare_stream_turn(
429        &mut self,
430        conversation_id: &str,
431        content: &str,
432        role: Option<Role>,
433    ) -> Result<
434        (
435            String,
436            Vec<Message>,
437            std::sync::Arc<dyn crate::llm::LLMProvider>,
438        ),
439        KowalskiError,
440    > {
441        self.prepare_stream_turn_with_options(conversation_id, content, role, true)
442            .await
443    }
444
445    pub async fn prepare_stream_turn_with_options(
446        &mut self,
447        conversation_id: &str,
448        content: &str,
449        role: Option<Role>,
450        use_memory: bool,
451    ) -> Result<
452        (
453            String,
454            Vec<Message>,
455            std::sync::Arc<dyn crate::llm::LLMProvider>,
456        ),
457        KowalskiError,
458    > {
459        let memory_context = self.build_memory_context(content, use_memory).await;
460
461        let conversation = self
462            .conversations
463            .get_mut(conversation_id)
464            .ok_or_else(|| KowalskiError::ConversationNotFound(conversation_id.to_string()))?;
465
466        if let Some(role) = role {
467            conversation.add_message("system", &role.get_prompt());
468
469            if let Some(audience) = role.get_audience() {
470                conversation.add_message("system", &audience.get_prompt());
471            }
472            if let Some(preset) = role.get_preset() {
473                conversation.add_message("system", &preset.get_prompt());
474            }
475            if let Some(style) = role.get_style() {
476                conversation.add_message("system", &style.get_prompt());
477            }
478        }
479
480        let fallback_context = if use_memory && memory_context.is_empty() {
481            Self::recent_conversation_context(&conversation.messages, 4)
482        } else {
483            String::new()
484        };
485
486        conversation.add_message("user", content);
487
488        let model = conversation.model.clone();
489        let mut messages = conversation.messages.clone();
490        let effective_context = if !memory_context.is_empty() {
491            memory_context
492        } else {
493            fallback_context
494        };
495        if !effective_context.is_empty() {
496            let memory_prompt = format!(
497                "Retrieved memory context (use only if relevant to the latest user request):\n--- Relevant Memories ---\n{}\n--- End Memories ---",
498                effective_context
499            );
500            let insert_at = messages.len().saturating_sub(1);
501            messages.insert(
502                insert_at,
503                Message {
504                    role: "system".to_string(),
505                    content: memory_prompt,
506                    tool_calls: None,
507                },
508            );
509        }
510        let llm = self.llm_provider.clone();
511        Ok((model, messages, llm))
512    }
513
514    /// Like [`Agent::chat_with_tools`] but emits **token deltas** over `token_tx` only for the first
515    /// LLM completion **after at least one tool execution** in this request (final natural answer).
516    pub async fn chat_with_tools_with_options(
517        &mut self,
518        conversation_id: &str,
519        user_input: &str,
520        use_memory: bool,
521    ) -> Result<String, KowalskiError> {
522        let mut final_response = String::new();
523        let mut current_input = user_input.to_string();
524        let mut iteration_count = 0;
525        const MAX_ITERATIONS: usize = 5;
526        let mut last_tool_call: Option<(String, serde_json::Value)> = None;
527        let mut tool_parse_hint_sent = false;
528
529        while iteration_count < MAX_ITERATIONS {
530            iteration_count += 1;
531            let response_text = self
532                .chat_with_history_with_options(conversation_id, &current_input, None, use_memory)
533                .await?;
534
535            if repl_trace::repl_trace_enabled() {
536                println!("[agent] {}", response_text);
537            } else {
538                println!("{}", response_text);
539            }
540            io::stdout()
541                .flush()
542                .map_err(|e| KowalskiError::Server(e.to_string()))?;
543
544            let buffer = response_text.clone();
545            let tool_calls = crate::utils::json::extract_tool_calls(&buffer);
546
547            if !tool_calls.is_empty() {
548                let tool_call = &tool_calls[0];
549                let tool_call_key = (tool_call.name.clone(), tool_call.parameters.clone());
550                if let Some(last) = &last_tool_call
551                    && *last == tool_call_key
552                {
553                    break;
554                }
555                last_tool_call = Some(tool_call_key);
556
557                let tool_result = match self
558                    .execute_tool(&tool_call.name, &tool_call.parameters)
559                    .await
560                {
561                    Ok(output) => output.result.to_string(),
562                    Err(e) => format!("{}", e),
563                };
564
565                let tool_message = format!("Tool result for {}: {}", tool_call.name, tool_result);
566                self.add_message(conversation_id, "assistant", &tool_message)
567                    .await;
568                current_input = format!("Based on the tool result: {}", tool_result);
569                continue;
570            }
571
572            if crate::utils::json::looks_like_tool_json_attempt(&buffer) && !tool_parse_hint_sent {
573                tool_parse_hint_sent = true;
574                self.add_message(conversation_id, "assistant", &buffer)
575                    .await;
576                const HINT: &str = "Your previous reply appeared to include a tool call but it could not be parsed as JSON. Reply with a single JSON object only: {\"name\": \"<tool_name>\", \"parameters\": { ... } } matching the available tools. No markdown fences or extra text.";
577                current_input = HINT.to_string();
578                continue;
579            }
580
581            final_response = buffer;
582            self.add_message(conversation_id, "assistant", &final_response)
583                .await;
584            break;
585        }
586
587        Ok(final_response)
588    }
589
590    pub async fn chat_with_tools_stream_final(
591        &mut self,
592        conversation_id: &str,
593        user_input: &str,
594        token_tx: &tokio::sync::mpsc::Sender<String>,
595    ) -> Result<String, KowalskiError> {
596        self.chat_with_tools_stream_final_with_options(conversation_id, user_input, token_tx, true)
597            .await
598    }
599
600    pub async fn chat_with_tools_stream_final_with_options(
601        &mut self,
602        conversation_id: &str,
603        user_input: &str,
604        token_tx: &tokio::sync::mpsc::Sender<String>,
605        use_memory: bool,
606    ) -> Result<String, KowalskiError> {
607        let mut final_response = String::new();
608        let mut current_input = user_input.to_string();
609        let mut iteration_count = 0;
610        const MAX_ITERATIONS: usize = 5;
611        let mut last_tool_call: Option<(String, serde_json::Value)> = None;
612        let mut tool_parse_hint_sent = false;
613        // After a tool ran, the next LLM completion is streamed (final answer in the common case).
614        let mut stream_next_llm_turn = false;
615
616        debug!("chat_with_tools_stream_final for input: '{}'", user_input);
617
618        while iteration_count < MAX_ITERATIONS {
619            iteration_count += 1;
620            let use_stream = std::mem::replace(&mut stream_next_llm_turn, false);
621            debug!(
622                " === ITERATION {} (stream_final={}) ===",
623                iteration_count, use_stream
624            );
625
626            let response_text = if use_stream {
627                let (model, messages, llm) = self
628                    .prepare_stream_turn_with_options(
629                        conversation_id,
630                        &current_input,
631                        None,
632                        use_memory,
633                    )
634                    .await?;
635                let mut full = String::new();
636                let mut stream = llm.chat_stream(&model, messages);
637                while let Some(item) = stream.next().await {
638                    let delta = item?;
639                    if !delta.is_empty() {
640                        full.push_str(&delta);
641                        let _ = token_tx.send(delta).await;
642                    }
643                }
644                full
645            } else {
646                self.chat_with_history_with_options(
647                    conversation_id,
648                    &current_input,
649                    None,
650                    use_memory,
651                )
652                .await?
653            };
654
655            if repl_trace::repl_trace_enabled() {
656                println!("[agent] {}", response_text);
657            } else {
658                println!("{}", response_text);
659            }
660            io::stdout()
661                .flush()
662                .map_err(|e| KowalskiError::Server(e.to_string()))?;
663
664            let buffer = response_text.clone();
665            let tool_calls = crate::utils::json::extract_tool_calls(&buffer);
666
667            if !tool_calls.is_empty() {
668                let tool_call = &tool_calls[0];
669                let tool_call_key = (tool_call.name.clone(), tool_call.parameters.clone());
670                if let Some(last) = &last_tool_call
671                    && *last == tool_call_key
672                {
673                    debug!("Repeated tool call; breaking");
674                    break;
675                }
676                last_tool_call = Some(tool_call_key.clone());
677
678                if repl_trace::repl_trace_enabled() {
679                    let params = serde_json::to_string(&tool_call.parameters)
680                        .unwrap_or_else(|_| "{}".to_string());
681                    println!("[tool] {} {}", tool_call.name, params);
682                }
683
684                let tool_result = match self
685                    .execute_tool(&tool_call.name, &tool_call.parameters)
686                    .await
687                {
688                    Ok(output) => output.result.to_string(),
689                    Err(e) => format!("{}", e),
690                };
691
692                let tool_message = format!("Tool result for {}: {}", tool_call.name, tool_result);
693                self.add_message(conversation_id, "assistant", &tool_message)
694                    .await;
695
696                current_input = format!("Based on the tool result: {}", tool_result);
697                stream_next_llm_turn = true;
698                continue;
699            }
700
701            if crate::utils::json::looks_like_tool_json_attempt(&buffer) && !tool_parse_hint_sent {
702                tool_parse_hint_sent = true;
703                warn!("Tool call JSON parse failed; requesting self-correction (non-stream)");
704                self.add_message(conversation_id, "assistant", &buffer)
705                    .await;
706                const HINT: &str = "Your previous reply appeared to include a tool call but it could not be parsed as JSON. Reply with a single JSON object only: {\"name\": \"<tool_name>\", \"parameters\": { ... } } matching the available tools. No markdown fences or extra text.";
707                current_input = HINT.to_string();
708                stream_next_llm_turn = false;
709                continue;
710            }
711
712            final_response = buffer;
713            self.add_message(conversation_id, "assistant", &final_response)
714                .await;
715
716            if let Some(tool_call) = rule_based_tool_call(user_input) {
717                let tool_result_str = match self
718                    .execute_tool(&tool_call.name, &tool_call.parameters)
719                    .await
720                {
721                    Ok(output) => output.result.to_string(),
722                    Err(e) => format!("Tool execution failed: {}", e),
723                };
724                self.add_message(conversation_id, "assistant", &tool_result_str)
725                    .await;
726                return Ok(tool_result_str);
727            }
728
729            break;
730        }
731
732        if iteration_count >= MAX_ITERATIONS {
733            warn!("Reached maximum iterations (stream_final)");
734        }
735
736        Ok(final_response)
737    }
738}
739
740#[async_trait]
741impl Agent for BaseAgent {
742    async fn new(config: Config) -> Result<Self, KowalskiError> {
743        crate::db::run_memory_migrations_if_configured(&config).await?;
744
745        let llm_provider = crate::llm::create_llm_provider(&config)?;
746
747        // Create memory providers
748        let working_memory = std::sync::Arc::new(tokio::sync::Mutex::new(WorkingMemory::new(100)))
749            as std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>;
750
751        let episodic_memory = std::sync::Arc::new(tokio::sync::Mutex::new(
752            crate::memory::episodic::EpisodicBuffer::open(&config.memory, llm_provider.clone())
753                .await?,
754        ))
755            as std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>;
756
757        let semantic_memory =
758            crate::memory::helpers::create_semantic_memory(&config, llm_provider.clone()).await?;
759
760        Self::new(
761            config,
762            "Base Agent",
763            "A basic agent implementation",
764            llm_provider,
765            working_memory,
766            episodic_memory,
767            semantic_memory,
768            crate::tools::manager::ToolManager::new(),
769        )
770        .await
771    }
772
773    fn start_conversation(&mut self, model: &str) -> String {
774        info!("Starting conversation with model: {}", model);
775        let conversation = Conversation::new(model);
776        let id = conversation.id.clone();
777        self.conversations.insert(id.clone(), conversation);
778        id
779    }
780
781    fn get_conversation(&self, id: &str) -> Option<&Conversation> {
782        self.conversations.get(id)
783    }
784
785    fn list_conversations(&self) -> Vec<&Conversation> {
786        self.conversations.values().collect()
787    }
788
789    fn delete_conversation(&mut self, id: &str) -> bool {
790        self.conversations.remove(id).is_some()
791    }
792
793    async fn chat_with_history(
794        &mut self,
795        conversation_id: &str,
796        content: &str,
797        role: Option<Role>,
798    ) -> Result<String, KowalskiError> {
799        self.chat_with_history_with_options(conversation_id, content, role, true)
800            .await
801    }
802
803    async fn process_stream_response(
804        &mut self,
805        conversation_id: &str,
806        chunk: &[u8],
807    ) -> Result<Option<Message>, KowalskiError> {
808        BaseAgent::process_stream_response(self, conversation_id, chunk).await
809    }
810
811    async fn add_message(&mut self, conversation_id: &str, role: &str, content: &str) {
812        BaseAgent::add_message(self, conversation_id, role, content).await;
813    }
814
815    fn export_conversation(&self, id: &str) -> Result<String, KowalskiError> {
816        BaseAgent::export_conversation(self, id)
817    }
818
819    fn import_conversation(&mut self, json_str: &str) -> Result<String, KowalskiError> {
820        BaseAgent::import_conversation(self, json_str)
821    }
822
823    fn name(&self) -> &str {
824        &self.name
825    }
826
827    fn description(&self) -> &str {
828        &self.description
829    }
830
831    fn as_any(&self) -> &dyn Any {
832        self
833    }
834}
835
836impl BaseAgent {
837    pub async fn chat_with_history_with_options(
838        &mut self,
839        conversation_id: &str,
840        content: &str,
841        role: Option<Role>,
842        use_memory: bool,
843    ) -> Result<String, KowalskiError> {
844        let memory_context = self.build_memory_context(content, use_memory).await;
845
846        let conversation = self
847            .conversations
848            .get_mut(conversation_id)
849            .ok_or_else(|| KowalskiError::ConversationNotFound(conversation_id.to_string()))?;
850
851        if let Some(role) = role {
852            conversation.add_message("system", &role.get_prompt());
853
854            if let Some(audience) = role.get_audience() {
855                conversation.add_message("system", &audience.get_prompt());
856            }
857            if let Some(preset) = role.get_preset() {
858                conversation.add_message("system", &preset.get_prompt());
859            }
860            if let Some(style) = role.get_style() {
861                conversation.add_message("system", &style.get_prompt());
862            }
863        }
864
865        let fallback_context = if use_memory && memory_context.is_empty() {
866            Self::recent_conversation_context(&conversation.messages, 4)
867        } else {
868            String::new()
869        };
870
871        // Persist raw user input in conversation history.
872        conversation.add_message("user", content);
873
874        // Build request-time LLM messages: conversation history + optional memory context.
875        // Memory context is ephemeral (not persisted as conversation turns).
876        let mut llm_messages = conversation.messages.clone();
877        let effective_context = if !memory_context.is_empty() {
878            memory_context
879        } else {
880            fallback_context
881        };
882        if !effective_context.is_empty() {
883            let memory_prompt = format!(
884                "Retrieved memory context (use only if relevant to the latest user request):\n--- Relevant Memories ---\n{}\n--- End Memories ---",
885                effective_context
886            );
887            let insert_at = llm_messages.len().saturating_sub(1);
888            llm_messages.insert(
889                insert_at,
890                Message {
891                    role: "system".to_string(),
892                    content: memory_prompt,
893                    tool_calls: None,
894                },
895            );
896        }
897
898        // Delegate to LLM Provider
899        let response = self
900            .llm_provider
901            .chat(&conversation.model, &llm_messages)
902            .await?;
903
904        Ok(response)
905    }
906
907    async fn process_stream_response(
908        &mut self,
909        _conversation_id: &str,
910        chunk: &[u8],
911    ) -> Result<Option<Message>, KowalskiError> {
912        let text = String::from_utf8(chunk.to_vec())
913            .map_err(|e| KowalskiError::Server(format!("Invalid UTF-8: {}", e)))?;
914
915        let stream_response: StreamResponse =
916            serde_json::from_str(&text).map_err(KowalskiError::Json)?;
917
918        if stream_response.done {
919            return Ok(None);
920        }
921
922        Ok(Some(stream_response.message))
923    }
924
925    async fn execute_tool(
926        &mut self,
927        tool_name: &str,
928        tool_input: &serde_json::Value,
929    ) -> Result<ToolOutput, KowalskiError> {
930        let task_type = tool_input
931            .get("task")
932            .and_then(|v| v.as_str())
933            .unwrap_or("default")
934            .to_string();
935        let content = tool_input
936            .get("content")
937            .and_then(|v| v.as_str())
938            .unwrap_or("")
939            .to_string();
940
941        let input = crate::tools::ToolInput::new(task_type, content, tool_input.clone());
942
943        self.tool_manager.execute(tool_name, input).await
944    }
945
946    async fn add_message(&mut self, conversation_id: &str, role: &str, content: &str) {
947        // 2. STORAGE: Archive the message to the episodic buffer
948        let now = SystemTime::now()
949            .duration_since(UNIX_EPOCH)
950            .unwrap_or_default();
951        let timestamp = now.as_secs();
952        let nanos = now.as_nanos();
953
954        let memory_unit = MemoryUnit {
955            // Use nanosecond precision to avoid collisions when multiple messages
956            // are added in the same second.
957            id: format!("{}-{}-{}-{}", conversation_id, timestamp, nanos, role),
958            timestamp,
959            content: format!("[{}] {}", role, content),
960            embedding: None, // Embeddings are generated during consolidation
961        };
962
963        // Add to Tier 1 working memory
964        if let Err(e) = self
965            .working_memory
966            .lock()
967            .await
968            .add(memory_unit.clone())
969            .await
970        {
971            eprintln!("Failed to add to working memory: {}", e);
972        }
973
974        // Add to Tier 2 episodic buffer
975        if let Err(e) = self.episodic_memory.lock().await.add(memory_unit).await {
976            eprintln!("Failed to add to episodic memory: {}", e);
977        }
978
979        if let Some(conversation) = self.conversations.get_mut(conversation_id) {
980            conversation.add_message(role, content);
981        }
982    }
983
984    fn export_conversation(&self, id: &str) -> Result<String, KowalskiError> {
985        let conversation = self
986            .conversations
987            .get(id)
988            .ok_or_else(|| KowalskiError::ConversationNotFound(id.to_string()))?;
989        serde_json::to_string(conversation).map_err(KowalskiError::Json)
990    }
991
992    fn import_conversation(&mut self, json_str: &str) -> Result<String, KowalskiError> {
993        let conversation: crate::conversation::Conversation =
994            serde_json::from_str(json_str).map_err(KowalskiError::Json)?;
995        let id = conversation.id.clone();
996        self.conversations.insert(id.clone(), conversation);
997        Ok(id)
998    }
999}
1000
1001#[async_trait]
1002pub trait MessageHandler: Send + Sync {
1003    type Message;
1004    type Error;
1005
1006    async fn handle_message(&mut self, message: Self::Message) -> Result<(), Self::Error>;
1007}
1008
1009fn rule_based_tool_call(user_input: &str) -> Option<ToolCall> {
1010    let input = user_input.to_lowercase();
1011    if input.contains("list")
1012        && input.contains("directory")
1013        && let Some(path) = input.split_whitespace().find(|w| w.starts_with('/'))
1014    {
1015        return Some(ToolCall {
1016            name: "fs_tool".to_string(),
1017            parameters: json!({ "task": "list_dir", "path": path }),
1018            reasoning: Some("Rule-based: user asked to list a directory".to_string()),
1019        });
1020    }
1021    if input.contains("first 10 lines")
1022        && input.contains(".csv")
1023        && let Some(path) = input.split_whitespace().find(|w| w.ends_with(".csv"))
1024    {
1025        return Some(ToolCall {
1026            name: "fs_tool".to_string(),
1027            parameters: json!({ "task": "get_file_first_lines", "path": path, "num_lines": 10 }),
1028            reasoning: Some("Rule-based: user asked for first 10 lines of a CSV".to_string()),
1029        });
1030    }
1031    // Add more rules as needed...
1032    None
1033}