1use crate::agent::types::StreamResponse;
2use crate::config::Config;
3use crate::conversation::Conversation;
4use crate::conversation::Message;
5use crate::error::KowalskiError;
6use crate::memory::MemoryProvider;
7use crate::memory::MemoryUnit;
8use crate::memory::working::WorkingMemory;
9use crate::role::Role;
10use crate::tools::{ToolCall, ToolOutput};
11use async_trait::async_trait;
12use futures::StreamExt;
13use log::debug;
14use log::info;
15use log::warn;
16use serde_json;
17use serde_json::json;
18use std::any::Any;
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::io::{self, Write};
22use std::time::{SystemTime, UNIX_EPOCH};
23
24pub mod repl_trace;
25pub mod types;
26
27#[async_trait]
29pub trait Agent: Send + Sync {
30 async fn new(config: Config) -> Result<Self, KowalskiError>
32 where
33 Self: Sized;
34
35 fn start_conversation(&mut self, model: &str) -> String;
37
38 fn get_conversation(&self, id: &str) -> Option<&Conversation>;
40
41 fn list_conversations(&self) -> Vec<&Conversation>;
43
44 fn delete_conversation(&mut self, id: &str) -> bool;
46
47 async fn chat_with_history(
49 &mut self,
50 conversation_id: &str,
51 content: &str,
52 role: Option<Role>,
53 ) -> Result<String, KowalskiError>;
54
55 async fn process_stream_response(
57 &mut self,
58 conversation_id: &str,
59 chunk: &[u8],
60 ) -> Result<Option<Message>, KowalskiError>;
61
62 async fn add_message(&mut self, conversation_id: &str, role: &str, content: &str);
64
65 fn export_conversation(&self, id: &str) -> Result<String, KowalskiError>;
67
68 fn import_conversation(&mut self, json: &str) -> Result<String, KowalskiError>;
70
71 async fn execute_tool(
73 &mut self,
74 _tool_name: &str,
75 _tool_input: &serde_json::Value,
76 ) -> Result<ToolOutput, KowalskiError> {
77 Err(KowalskiError::ToolExecution(
78 "Tool execution not implemented for this agent".to_string(),
79 ))
80 }
81
82 async fn chat_with_tools(
84 &mut self,
85 conversation_id: &str,
86 user_input: &str,
87 ) -> Result<String, KowalskiError> {
88 let mut final_response = String::new();
89 let mut current_input = user_input.to_string();
90 let mut iteration_count = 0;
91 const MAX_ITERATIONS: usize = 5; let mut last_tool_call: Option<(String, serde_json::Value)> = None;
93 let mut tool_parse_hint_sent = false;
94
95 debug!("Starting chat_with_tools for input: '{}'", user_input);
96
97 while iteration_count < MAX_ITERATIONS {
98 iteration_count += 1;
99 debug!(" === ITERATION {} ===", iteration_count);
100 debug!("Current input: '{}'", current_input);
101
102 debug!("Calling LLM...");
104 let response_text = self
105 .chat_with_history(conversation_id, ¤t_input, None)
106 .await?;
107
108 if repl_trace::repl_trace_enabled() {
110 println!("[agent] {}", response_text);
111 } else {
112 println!("{}", response_text);
113 }
114 io::stdout()
115 .flush()
116 .map_err(|e| KowalskiError::Server(e.to_string()))?;
117
118 let buffer = response_text.clone();
119 debug!("Full LLM response: '{}'", buffer);
120
121 debug!("Attempting to extract tool calls from response...");
123 let tool_calls = crate::utils::json::extract_tool_calls(&buffer);
124
125 if !tool_calls.is_empty() {
126 let tool_call = &tool_calls[0];
128
129 let tool_call_key = (tool_call.name.clone(), tool_call.parameters.clone());
131 if let Some(last) = &last_tool_call
132 && *last == tool_call_key
133 {
134 debug!(
135 "Detected repeated tool call. Breaking loop to prevent infinite tool call loop."
136 );
137 break;
138 }
139 last_tool_call = Some(tool_call_key.clone());
140
141 debug!("✅ Tool call successfully parsed!");
142 debug!("Tool: {}", tool_call.name);
143 debug!("Parameters: {}", tool_call.parameters);
144 debug!("Reasoning: {:?}", tool_call.reasoning);
145
146 if repl_trace::repl_trace_enabled() {
147 let params = serde_json::to_string(&tool_call.parameters)
148 .unwrap_or_else(|_| "{}".to_string());
149 println!("[tool] {} {}", tool_call.name, params);
150 }
151
152 let tool_result = match self
153 .execute_tool(&tool_call.name, &tool_call.parameters)
154 .await
155 {
156 Ok(output) => output.result.to_string(),
157 Err(e) => {
158 let err_msg = format!("{}", e);
159 debug!("Tool execution failed: {}", err_msg);
160
161 err_msg
163 }
164 };
165
166 let tool_message = format!("Tool result for {}: {}", tool_call.name, tool_result);
167 self.add_message(conversation_id, "assistant", &tool_message)
168 .await;
169 debug!("Added tool result to conversation");
170
171 current_input = format!("Based on the tool result: {}", tool_result);
172 debug!("Continuing with new input: '{}'", current_input);
173 continue;
174 }
175
176 if crate::utils::json::looks_like_tool_json_attempt(&buffer) && !tool_parse_hint_sent {
177 tool_parse_hint_sent = true;
178 let preview: String = buffer.chars().take(400).collect();
179 let total_chars = buffer.chars().count();
180 warn!(
181 "Tool call JSON parse failed ({} chars); raw preview: {:?}",
182 total_chars, preview
183 );
184 self.add_message(conversation_id, "assistant", &buffer)
185 .await;
186 const HINT: &str = "Your previous reply appeared to include a tool call but it could not be parsed as JSON. Reply with a single JSON object only: {\"name\": \"<tool_name>\", \"parameters\": { ... } } matching the available tools. No markdown fences or extra text.";
187 current_input = HINT.to_string();
188 debug!("Tool JSON parse failed; requesting one self-correction turn");
189 continue;
190 }
191
192 final_response = buffer;
194 self.add_message(conversation_id, "assistant", &final_response)
195 .await;
196 debug!("✅ Final response set: '{}'", final_response);
197
198 if let Some(tool_call) = rule_based_tool_call(user_input) {
199 debug!("Rule-based tool call triggered: {:?}", tool_call);
200 let tool_result = self
201 .execute_tool(&tool_call.name, &tool_call.parameters)
202 .await;
203 let tool_result_str = match tool_result {
204 Ok(output) => output.result.to_string(),
205 Err(e) => format!("Tool execution failed: {}", e),
206 };
207 self.add_message(conversation_id, "assistant", &tool_result_str)
208 .await;
209 debug!("Rule-based tool result: {}", tool_result_str);
210 return Ok(tool_result_str);
211 }
212
213 break;
214 }
215
216 if iteration_count >= MAX_ITERATIONS {
217 warn!("Reached maximum iterations, returning current response");
218 }
219
220 debug!(
221 "chat_with_tools completed after {} iterations",
222 iteration_count
223 );
224 Ok(final_response)
225 }
226
227 async fn list_tools(&self) -> Vec<(String, String)> {
229 Vec::new()
230 }
231
232 fn name(&self) -> &str;
233
234 fn description(&self) -> &str;
236
237 fn as_any(&self) -> &dyn Any;
238}
239
240pub struct BaseAgent {
242 pub client: reqwest::Client,
243 pub config: Config,
244 pub conversations: HashMap<String, Conversation>,
245 pub name: String,
246 pub description: String,
247 pub system_prompt: Option<String>,
248 pub llm_provider: std::sync::Arc<dyn crate::llm::LLMProvider>,
250 pub working_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
252 pub episodic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
253 pub semantic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
254 pub tool_manager: crate::tools::manager::ToolManager,
256}
257
258#[derive(Debug, Clone)]
259pub struct MemoryDebugInfo {
260 pub memory_used: bool,
261 pub memory_source: String,
262 pub memory_items_count: usize,
263}
264
265impl BaseAgent {
266 fn recent_conversation_items(messages: &[Message], max_items: usize) -> Vec<String> {
267 let mut recent: Vec<String> = messages
268 .iter()
269 .rev()
270 .filter(|m| m.role != "system")
271 .take(max_items)
272 .map(|m| format!("[{}] {}", m.role, m.content))
273 .collect();
274 recent.reverse();
275 recent
276 }
277
278 fn recent_conversation_context(messages: &[Message], max_items: usize) -> String {
279 Self::recent_conversation_items(messages, max_items).join("\n---\n")
280 }
281
282 async fn retrieve_memory_items(&self, content: &str, use_memory: bool) -> Vec<MemoryUnit> {
283 if !use_memory {
284 return Vec::new();
285 }
286
287 let working_memories = self
288 .working_memory
289 .lock()
290 .await
291 .retrieve(content, self.config.working_memory_retrieval_limit)
292 .await
293 .unwrap_or_default();
294
295 let episodic_memories = self
296 .episodic_memory
297 .lock()
298 .await
299 .retrieve(content, self.config.episodic_memory_retrieval_limit)
300 .await
301 .unwrap_or_default();
302
303 let semantic_memories = self
304 .semantic_memory
305 .lock()
306 .await
307 .retrieve(content, self.config.semantic_memory_retrieval_limit)
308 .await
309 .unwrap_or_default();
310
311 let mut seen_ids = HashSet::new();
312 let mut all_memories = Vec::new();
313 for m in working_memories
314 .into_iter()
315 .chain(episodic_memories)
316 .chain(semantic_memories)
317 {
318 if seen_ids.insert(m.id.clone()) {
319 all_memories.push(m);
320 }
321 }
322 all_memories
323 }
324
325 async fn build_memory_context(&self, content: &str, use_memory: bool) -> String {
326 let all_memories = self.retrieve_memory_items(content, use_memory).await;
327
328 if all_memories.is_empty() {
329 return String::new();
330 }
331
332 let concatenated_memories = all_memories
333 .iter()
334 .map(|m| m.content.as_str())
335 .collect::<Vec<&str>>()
336 .join("\n---\n");
337 format!(
338 "\n--- Relevant Memories ---\n{}\n--- End Memories ---",
339 concatenated_memories
340 )
341 }
342
343 pub async fn preview_memory_debug(
344 &self,
345 conversation_id: &str,
346 content: &str,
347 use_memory: bool,
348 ) -> MemoryDebugInfo {
349 if !use_memory {
350 return MemoryDebugInfo {
351 memory_used: false,
352 memory_source: "disabled".to_string(),
353 memory_items_count: 0,
354 };
355 }
356 let retrieved = self.retrieve_memory_items(content, true).await;
357 if !retrieved.is_empty() {
358 return MemoryDebugInfo {
359 memory_used: true,
360 memory_source: "retrieved".to_string(),
361 memory_items_count: retrieved.len(),
362 };
363 }
364 let fallback_count = self
365 .conversations
366 .get(conversation_id)
367 .map(|c| Self::recent_conversation_items(&c.messages, 4).len())
368 .unwrap_or(0);
369 if fallback_count > 0 {
370 return MemoryDebugInfo {
371 memory_used: true,
372 memory_source: "fallback".to_string(),
373 memory_items_count: fallback_count,
374 };
375 }
376 MemoryDebugInfo {
377 memory_used: false,
378 memory_source: "none".to_string(),
379 memory_items_count: 0,
380 }
381 }
382
383 #[allow(clippy::too_many_arguments)]
384 pub async fn new(
385 config: Config,
386 name: &str,
387 description: &str,
388 llm_provider: std::sync::Arc<dyn crate::llm::LLMProvider>,
389 working_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
390 episodic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
391 semantic_memory: std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>,
392 tool_manager: crate::tools::manager::ToolManager,
393 ) -> Result<Self, KowalskiError> {
394 let client = reqwest::ClientBuilder::new()
395 .http1_only()
396 .pool_max_idle_per_host(0)
397 .build()
398 .map_err(KowalskiError::Request)?;
399
400 info!("BaseAgent created with name: {}", name);
401
402 Ok(Self {
403 client,
404 config,
405 conversations: HashMap::new(),
406 name: name.to_string(),
407 description: description.to_string(),
408 system_prompt: None,
409 llm_provider,
410 working_memory,
411 episodic_memory,
412 semantic_memory,
413 tool_manager,
414 })
415 }
416
417 pub fn set_temperature(&mut self, temperature: f32) {
418 self.config.chat.temperature = temperature;
419 }
420
421 pub fn set_system_prompt(&mut self, prompt: &str) {
422 self.system_prompt = Some(prompt.to_string());
423 }
424
425 pub async fn prepare_stream_turn(
429 &mut self,
430 conversation_id: &str,
431 content: &str,
432 role: Option<Role>,
433 ) -> Result<
434 (
435 String,
436 Vec<Message>,
437 std::sync::Arc<dyn crate::llm::LLMProvider>,
438 ),
439 KowalskiError,
440 > {
441 self.prepare_stream_turn_with_options(conversation_id, content, role, true)
442 .await
443 }
444
445 pub async fn prepare_stream_turn_with_options(
446 &mut self,
447 conversation_id: &str,
448 content: &str,
449 role: Option<Role>,
450 use_memory: bool,
451 ) -> Result<
452 (
453 String,
454 Vec<Message>,
455 std::sync::Arc<dyn crate::llm::LLMProvider>,
456 ),
457 KowalskiError,
458 > {
459 let memory_context = self.build_memory_context(content, use_memory).await;
460
461 let conversation = self
462 .conversations
463 .get_mut(conversation_id)
464 .ok_or_else(|| KowalskiError::ConversationNotFound(conversation_id.to_string()))?;
465
466 if let Some(role) = role {
467 conversation.add_message("system", &role.get_prompt());
468
469 if let Some(audience) = role.get_audience() {
470 conversation.add_message("system", &audience.get_prompt());
471 }
472 if let Some(preset) = role.get_preset() {
473 conversation.add_message("system", &preset.get_prompt());
474 }
475 if let Some(style) = role.get_style() {
476 conversation.add_message("system", &style.get_prompt());
477 }
478 }
479
480 let fallback_context = if use_memory && memory_context.is_empty() {
481 Self::recent_conversation_context(&conversation.messages, 4)
482 } else {
483 String::new()
484 };
485
486 conversation.add_message("user", content);
487
488 let model = conversation.model.clone();
489 let mut messages = conversation.messages.clone();
490 let effective_context = if !memory_context.is_empty() {
491 memory_context
492 } else {
493 fallback_context
494 };
495 if !effective_context.is_empty() {
496 let memory_prompt = format!(
497 "Retrieved memory context (use only if relevant to the latest user request):\n--- Relevant Memories ---\n{}\n--- End Memories ---",
498 effective_context
499 );
500 let insert_at = messages.len().saturating_sub(1);
501 messages.insert(
502 insert_at,
503 Message {
504 role: "system".to_string(),
505 content: memory_prompt,
506 tool_calls: None,
507 },
508 );
509 }
510 let llm = self.llm_provider.clone();
511 Ok((model, messages, llm))
512 }
513
514 pub async fn chat_with_tools_with_options(
517 &mut self,
518 conversation_id: &str,
519 user_input: &str,
520 use_memory: bool,
521 ) -> Result<String, KowalskiError> {
522 let mut final_response = String::new();
523 let mut current_input = user_input.to_string();
524 let mut iteration_count = 0;
525 const MAX_ITERATIONS: usize = 5;
526 let mut last_tool_call: Option<(String, serde_json::Value)> = None;
527 let mut tool_parse_hint_sent = false;
528
529 while iteration_count < MAX_ITERATIONS {
530 iteration_count += 1;
531 let response_text = self
532 .chat_with_history_with_options(conversation_id, ¤t_input, None, use_memory)
533 .await?;
534
535 if repl_trace::repl_trace_enabled() {
536 println!("[agent] {}", response_text);
537 } else {
538 println!("{}", response_text);
539 }
540 io::stdout()
541 .flush()
542 .map_err(|e| KowalskiError::Server(e.to_string()))?;
543
544 let buffer = response_text.clone();
545 let tool_calls = crate::utils::json::extract_tool_calls(&buffer);
546
547 if !tool_calls.is_empty() {
548 let tool_call = &tool_calls[0];
549 let tool_call_key = (tool_call.name.clone(), tool_call.parameters.clone());
550 if let Some(last) = &last_tool_call
551 && *last == tool_call_key
552 {
553 break;
554 }
555 last_tool_call = Some(tool_call_key);
556
557 let tool_result = match self
558 .execute_tool(&tool_call.name, &tool_call.parameters)
559 .await
560 {
561 Ok(output) => output.result.to_string(),
562 Err(e) => format!("{}", e),
563 };
564
565 let tool_message = format!("Tool result for {}: {}", tool_call.name, tool_result);
566 self.add_message(conversation_id, "assistant", &tool_message)
567 .await;
568 current_input = format!("Based on the tool result: {}", tool_result);
569 continue;
570 }
571
572 if crate::utils::json::looks_like_tool_json_attempt(&buffer) && !tool_parse_hint_sent {
573 tool_parse_hint_sent = true;
574 self.add_message(conversation_id, "assistant", &buffer)
575 .await;
576 const HINT: &str = "Your previous reply appeared to include a tool call but it could not be parsed as JSON. Reply with a single JSON object only: {\"name\": \"<tool_name>\", \"parameters\": { ... } } matching the available tools. No markdown fences or extra text.";
577 current_input = HINT.to_string();
578 continue;
579 }
580
581 final_response = buffer;
582 self.add_message(conversation_id, "assistant", &final_response)
583 .await;
584 break;
585 }
586
587 Ok(final_response)
588 }
589
590 pub async fn chat_with_tools_stream_final(
591 &mut self,
592 conversation_id: &str,
593 user_input: &str,
594 token_tx: &tokio::sync::mpsc::Sender<String>,
595 ) -> Result<String, KowalskiError> {
596 self.chat_with_tools_stream_final_with_options(conversation_id, user_input, token_tx, true)
597 .await
598 }
599
600 pub async fn chat_with_tools_stream_final_with_options(
601 &mut self,
602 conversation_id: &str,
603 user_input: &str,
604 token_tx: &tokio::sync::mpsc::Sender<String>,
605 use_memory: bool,
606 ) -> Result<String, KowalskiError> {
607 let mut final_response = String::new();
608 let mut current_input = user_input.to_string();
609 let mut iteration_count = 0;
610 const MAX_ITERATIONS: usize = 5;
611 let mut last_tool_call: Option<(String, serde_json::Value)> = None;
612 let mut tool_parse_hint_sent = false;
613 let mut stream_next_llm_turn = false;
615
616 debug!("chat_with_tools_stream_final for input: '{}'", user_input);
617
618 while iteration_count < MAX_ITERATIONS {
619 iteration_count += 1;
620 let use_stream = std::mem::replace(&mut stream_next_llm_turn, false);
621 debug!(
622 " === ITERATION {} (stream_final={}) ===",
623 iteration_count, use_stream
624 );
625
626 let response_text = if use_stream {
627 let (model, messages, llm) = self
628 .prepare_stream_turn_with_options(
629 conversation_id,
630 ¤t_input,
631 None,
632 use_memory,
633 )
634 .await?;
635 let mut full = String::new();
636 let mut stream = llm.chat_stream(&model, messages);
637 while let Some(item) = stream.next().await {
638 let delta = item?;
639 if !delta.is_empty() {
640 full.push_str(&delta);
641 let _ = token_tx.send(delta).await;
642 }
643 }
644 full
645 } else {
646 self.chat_with_history_with_options(
647 conversation_id,
648 ¤t_input,
649 None,
650 use_memory,
651 )
652 .await?
653 };
654
655 if repl_trace::repl_trace_enabled() {
656 println!("[agent] {}", response_text);
657 } else {
658 println!("{}", response_text);
659 }
660 io::stdout()
661 .flush()
662 .map_err(|e| KowalskiError::Server(e.to_string()))?;
663
664 let buffer = response_text.clone();
665 let tool_calls = crate::utils::json::extract_tool_calls(&buffer);
666
667 if !tool_calls.is_empty() {
668 let tool_call = &tool_calls[0];
669 let tool_call_key = (tool_call.name.clone(), tool_call.parameters.clone());
670 if let Some(last) = &last_tool_call
671 && *last == tool_call_key
672 {
673 debug!("Repeated tool call; breaking");
674 break;
675 }
676 last_tool_call = Some(tool_call_key.clone());
677
678 if repl_trace::repl_trace_enabled() {
679 let params = serde_json::to_string(&tool_call.parameters)
680 .unwrap_or_else(|_| "{}".to_string());
681 println!("[tool] {} {}", tool_call.name, params);
682 }
683
684 let tool_result = match self
685 .execute_tool(&tool_call.name, &tool_call.parameters)
686 .await
687 {
688 Ok(output) => output.result.to_string(),
689 Err(e) => format!("{}", e),
690 };
691
692 let tool_message = format!("Tool result for {}: {}", tool_call.name, tool_result);
693 self.add_message(conversation_id, "assistant", &tool_message)
694 .await;
695
696 current_input = format!("Based on the tool result: {}", tool_result);
697 stream_next_llm_turn = true;
698 continue;
699 }
700
701 if crate::utils::json::looks_like_tool_json_attempt(&buffer) && !tool_parse_hint_sent {
702 tool_parse_hint_sent = true;
703 warn!("Tool call JSON parse failed; requesting self-correction (non-stream)");
704 self.add_message(conversation_id, "assistant", &buffer)
705 .await;
706 const HINT: &str = "Your previous reply appeared to include a tool call but it could not be parsed as JSON. Reply with a single JSON object only: {\"name\": \"<tool_name>\", \"parameters\": { ... } } matching the available tools. No markdown fences or extra text.";
707 current_input = HINT.to_string();
708 stream_next_llm_turn = false;
709 continue;
710 }
711
712 final_response = buffer;
713 self.add_message(conversation_id, "assistant", &final_response)
714 .await;
715
716 if let Some(tool_call) = rule_based_tool_call(user_input) {
717 let tool_result_str = match self
718 .execute_tool(&tool_call.name, &tool_call.parameters)
719 .await
720 {
721 Ok(output) => output.result.to_string(),
722 Err(e) => format!("Tool execution failed: {}", e),
723 };
724 self.add_message(conversation_id, "assistant", &tool_result_str)
725 .await;
726 return Ok(tool_result_str);
727 }
728
729 break;
730 }
731
732 if iteration_count >= MAX_ITERATIONS {
733 warn!("Reached maximum iterations (stream_final)");
734 }
735
736 Ok(final_response)
737 }
738}
739
740#[async_trait]
741impl Agent for BaseAgent {
742 async fn new(config: Config) -> Result<Self, KowalskiError> {
743 crate::db::run_memory_migrations_if_configured(&config).await?;
744
745 let llm_provider = crate::llm::create_llm_provider(&config)?;
746
747 let working_memory = std::sync::Arc::new(tokio::sync::Mutex::new(WorkingMemory::new(100)))
749 as std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>;
750
751 let episodic_memory = std::sync::Arc::new(tokio::sync::Mutex::new(
752 crate::memory::episodic::EpisodicBuffer::open(&config.memory, llm_provider.clone())
753 .await?,
754 ))
755 as std::sync::Arc<tokio::sync::Mutex<dyn MemoryProvider + Send + Sync>>;
756
757 let semantic_memory =
758 crate::memory::helpers::create_semantic_memory(&config, llm_provider.clone()).await?;
759
760 Self::new(
761 config,
762 "Base Agent",
763 "A basic agent implementation",
764 llm_provider,
765 working_memory,
766 episodic_memory,
767 semantic_memory,
768 crate::tools::manager::ToolManager::new(),
769 )
770 .await
771 }
772
773 fn start_conversation(&mut self, model: &str) -> String {
774 info!("Starting conversation with model: {}", model);
775 let conversation = Conversation::new(model);
776 let id = conversation.id.clone();
777 self.conversations.insert(id.clone(), conversation);
778 id
779 }
780
781 fn get_conversation(&self, id: &str) -> Option<&Conversation> {
782 self.conversations.get(id)
783 }
784
785 fn list_conversations(&self) -> Vec<&Conversation> {
786 self.conversations.values().collect()
787 }
788
789 fn delete_conversation(&mut self, id: &str) -> bool {
790 self.conversations.remove(id).is_some()
791 }
792
793 async fn chat_with_history(
794 &mut self,
795 conversation_id: &str,
796 content: &str,
797 role: Option<Role>,
798 ) -> Result<String, KowalskiError> {
799 self.chat_with_history_with_options(conversation_id, content, role, true)
800 .await
801 }
802
803 async fn process_stream_response(
804 &mut self,
805 conversation_id: &str,
806 chunk: &[u8],
807 ) -> Result<Option<Message>, KowalskiError> {
808 BaseAgent::process_stream_response(self, conversation_id, chunk).await
809 }
810
811 async fn add_message(&mut self, conversation_id: &str, role: &str, content: &str) {
812 BaseAgent::add_message(self, conversation_id, role, content).await;
813 }
814
815 fn export_conversation(&self, id: &str) -> Result<String, KowalskiError> {
816 BaseAgent::export_conversation(self, id)
817 }
818
819 fn import_conversation(&mut self, json_str: &str) -> Result<String, KowalskiError> {
820 BaseAgent::import_conversation(self, json_str)
821 }
822
823 fn name(&self) -> &str {
824 &self.name
825 }
826
827 fn description(&self) -> &str {
828 &self.description
829 }
830
831 fn as_any(&self) -> &dyn Any {
832 self
833 }
834}
835
836impl BaseAgent {
837 pub async fn chat_with_history_with_options(
838 &mut self,
839 conversation_id: &str,
840 content: &str,
841 role: Option<Role>,
842 use_memory: bool,
843 ) -> Result<String, KowalskiError> {
844 let memory_context = self.build_memory_context(content, use_memory).await;
845
846 let conversation = self
847 .conversations
848 .get_mut(conversation_id)
849 .ok_or_else(|| KowalskiError::ConversationNotFound(conversation_id.to_string()))?;
850
851 if let Some(role) = role {
852 conversation.add_message("system", &role.get_prompt());
853
854 if let Some(audience) = role.get_audience() {
855 conversation.add_message("system", &audience.get_prompt());
856 }
857 if let Some(preset) = role.get_preset() {
858 conversation.add_message("system", &preset.get_prompt());
859 }
860 if let Some(style) = role.get_style() {
861 conversation.add_message("system", &style.get_prompt());
862 }
863 }
864
865 let fallback_context = if use_memory && memory_context.is_empty() {
866 Self::recent_conversation_context(&conversation.messages, 4)
867 } else {
868 String::new()
869 };
870
871 conversation.add_message("user", content);
873
874 let mut llm_messages = conversation.messages.clone();
877 let effective_context = if !memory_context.is_empty() {
878 memory_context
879 } else {
880 fallback_context
881 };
882 if !effective_context.is_empty() {
883 let memory_prompt = format!(
884 "Retrieved memory context (use only if relevant to the latest user request):\n--- Relevant Memories ---\n{}\n--- End Memories ---",
885 effective_context
886 );
887 let insert_at = llm_messages.len().saturating_sub(1);
888 llm_messages.insert(
889 insert_at,
890 Message {
891 role: "system".to_string(),
892 content: memory_prompt,
893 tool_calls: None,
894 },
895 );
896 }
897
898 let response = self
900 .llm_provider
901 .chat(&conversation.model, &llm_messages)
902 .await?;
903
904 Ok(response)
905 }
906
907 async fn process_stream_response(
908 &mut self,
909 _conversation_id: &str,
910 chunk: &[u8],
911 ) -> Result<Option<Message>, KowalskiError> {
912 let text = String::from_utf8(chunk.to_vec())
913 .map_err(|e| KowalskiError::Server(format!("Invalid UTF-8: {}", e)))?;
914
915 let stream_response: StreamResponse =
916 serde_json::from_str(&text).map_err(KowalskiError::Json)?;
917
918 if stream_response.done {
919 return Ok(None);
920 }
921
922 Ok(Some(stream_response.message))
923 }
924
925 async fn execute_tool(
926 &mut self,
927 tool_name: &str,
928 tool_input: &serde_json::Value,
929 ) -> Result<ToolOutput, KowalskiError> {
930 let task_type = tool_input
931 .get("task")
932 .and_then(|v| v.as_str())
933 .unwrap_or("default")
934 .to_string();
935 let content = tool_input
936 .get("content")
937 .and_then(|v| v.as_str())
938 .unwrap_or("")
939 .to_string();
940
941 let input = crate::tools::ToolInput::new(task_type, content, tool_input.clone());
942
943 self.tool_manager.execute(tool_name, input).await
944 }
945
946 async fn add_message(&mut self, conversation_id: &str, role: &str, content: &str) {
947 let now = SystemTime::now()
949 .duration_since(UNIX_EPOCH)
950 .unwrap_or_default();
951 let timestamp = now.as_secs();
952 let nanos = now.as_nanos();
953
954 let memory_unit = MemoryUnit {
955 id: format!("{}-{}-{}-{}", conversation_id, timestamp, nanos, role),
958 timestamp,
959 content: format!("[{}] {}", role, content),
960 embedding: None, };
962
963 if let Err(e) = self
965 .working_memory
966 .lock()
967 .await
968 .add(memory_unit.clone())
969 .await
970 {
971 eprintln!("Failed to add to working memory: {}", e);
972 }
973
974 if let Err(e) = self.episodic_memory.lock().await.add(memory_unit).await {
976 eprintln!("Failed to add to episodic memory: {}", e);
977 }
978
979 if let Some(conversation) = self.conversations.get_mut(conversation_id) {
980 conversation.add_message(role, content);
981 }
982 }
983
984 fn export_conversation(&self, id: &str) -> Result<String, KowalskiError> {
985 let conversation = self
986 .conversations
987 .get(id)
988 .ok_or_else(|| KowalskiError::ConversationNotFound(id.to_string()))?;
989 serde_json::to_string(conversation).map_err(KowalskiError::Json)
990 }
991
992 fn import_conversation(&mut self, json_str: &str) -> Result<String, KowalskiError> {
993 let conversation: crate::conversation::Conversation =
994 serde_json::from_str(json_str).map_err(KowalskiError::Json)?;
995 let id = conversation.id.clone();
996 self.conversations.insert(id.clone(), conversation);
997 Ok(id)
998 }
999}
1000
1001#[async_trait]
1002pub trait MessageHandler: Send + Sync {
1003 type Message;
1004 type Error;
1005
1006 async fn handle_message(&mut self, message: Self::Message) -> Result<(), Self::Error>;
1007}
1008
1009fn rule_based_tool_call(user_input: &str) -> Option<ToolCall> {
1010 let input = user_input.to_lowercase();
1011 if input.contains("list")
1012 && input.contains("directory")
1013 && let Some(path) = input.split_whitespace().find(|w| w.starts_with('/'))
1014 {
1015 return Some(ToolCall {
1016 name: "fs_tool".to_string(),
1017 parameters: json!({ "task": "list_dir", "path": path }),
1018 reasoning: Some("Rule-based: user asked to list a directory".to_string()),
1019 });
1020 }
1021 if input.contains("first 10 lines")
1022 && input.contains(".csv")
1023 && let Some(path) = input.split_whitespace().find(|w| w.ends_with(".csv"))
1024 {
1025 return Some(ToolCall {
1026 name: "fs_tool".to_string(),
1027 parameters: json!({ "task": "get_file_first_lines", "path": path, "num_lines": 10 }),
1028 reasoning: Some("Rule-based: user asked for first 10 lines of a CSV".to_string()),
1029 });
1030 }
1031 None
1033}