1use crate::agent::ToolUse;
6use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
7use crate::event_stream::ChatEvent;
8use crate::event_stream::s3_sink::S3Sink;
9use crate::provider::{Message, Usage};
10use crate::rlm::router::AutoProcessContext;
11use crate::rlm::{RlmChunker, RlmConfig, RlmRouter, RoutingContext};
12use crate::tool::ToolRegistry;
13use anyhow::Result;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::path::PathBuf;
18use std::sync::Arc;
19use tokio::fs;
20use uuid::Uuid;
21
22use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
23
24#[derive(Debug, Clone)]
26pub struct ImageAttachment {
27 pub data_url: String,
29 pub mime_type: Option<String>,
31}
32
33fn is_interactive_tool(tool_name: &str) -> bool {
34 matches!(tool_name, "question")
35}
36
37fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
38 let preferred = [
41 "zai",
42 "openai",
43 "github-copilot",
44 "anthropic",
45 "minimax",
46 "openrouter",
47 "novita",
48 "moonshotai",
49 "google",
50 ];
51 for name in preferred {
52 if let Some(found) = providers.iter().copied().find(|p| *p == name) {
53 return Some(found);
54 }
55 }
56 providers.first().copied()
57}
58
59fn prefers_temperature_one(model: &str) -> bool {
60 let normalized = model.to_ascii_lowercase();
61 normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
62}
63
64fn context_window_for_model(model: &str) -> usize {
66 let m = model.to_ascii_lowercase();
67 if m.contains("kimi-k2") {
68 256_000
69 } else if m.contains("glm-5") || m.contains("glm5") {
70 200_000
71 } else if m.contains("gpt-4o") {
72 128_000
73 } else if m.contains("gpt-5") {
74 256_000
75 } else if m.contains("claude") {
76 200_000
77 } else if m.contains("gemini") {
78 1_000_000
79 } else if m.contains("minimax") || m.contains("m2.5") {
80 256_000
81 } else if m.contains("qwen") {
82 131_072
83 } else {
84 128_000 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct Session {
91 pub id: String,
92 pub title: Option<String>,
93 pub created_at: DateTime<Utc>,
94 pub updated_at: DateTime<Utc>,
95 pub messages: Vec<Message>,
96 pub tool_uses: Vec<ToolUse>,
97 pub usage: Usage,
98 pub agent: String,
99 pub metadata: SessionMetadata,
100 #[serde(skip)]
102 pub bus: Option<Arc<crate::bus::AgentBus>>,
103}
104
105#[derive(Debug, Clone, Default, Serialize, Deserialize)]
106pub struct SessionMetadata {
107 pub directory: Option<PathBuf>,
108 pub model: Option<String>,
109 pub shared: bool,
110 pub share_url: Option<String>,
111}
112
113impl Session {
114 fn default_model_for_provider(provider: &str) -> String {
115 match provider {
116 "moonshotai" => "kimi-k2.5".to_string(),
117 "anthropic" => "claude-sonnet-4-20250514".to_string(),
118 "minimax" => "MiniMax-M2.5".to_string(),
119 "openai" => "gpt-4o".to_string(),
120 "google" => "gemini-2.5-pro".to_string(),
121 "zhipuai" | "zai" => "glm-5".to_string(),
122 "openrouter" => "z-ai/glm-5".to_string(),
124 "novita" => "qwen/qwen3-coder-next".to_string(),
125 "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
126 _ => "glm-5".to_string(),
127 }
128 }
129
130 pub async fn new() -> Result<Self> {
132 let id = Uuid::new_v4().to_string();
133 let now = Utc::now();
134
135 Ok(Self {
136 id,
137 title: None,
138 created_at: now,
139 updated_at: now,
140 messages: Vec::new(),
141 tool_uses: Vec::new(),
142 usage: Usage::default(),
143 agent: "build".to_string(),
144 metadata: SessionMetadata {
145 directory: Some(std::env::current_dir()?),
146 ..Default::default()
147 },
148 bus: None,
149 })
150 }
151
152 pub fn with_bus(mut self, bus: Arc<crate::bus::AgentBus>) -> Self {
154 self.bus = Some(bus);
155 self
156 }
157
158 pub async fn load(id: &str) -> Result<Self> {
160 let path = Self::session_path(id)?;
161 let content = fs::read_to_string(&path).await?;
162 let session: Session = serde_json::from_str(&content)?;
163 Ok(session)
164 }
165
166 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
171 let sessions_dir = Self::sessions_dir()?;
172
173 if !sessions_dir.exists() {
174 anyhow::bail!("No sessions found");
175 }
176
177 let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
178 let mut read_dir = fs::read_dir(&sessions_dir).await?;
179 while let Some(entry) = read_dir.next_entry().await? {
180 entries.push(entry);
181 }
182
183 if entries.is_empty() {
184 anyhow::bail!("No sessions found");
185 }
186
187 entries.sort_by_key(|e| {
190 std::cmp::Reverse(
191 std::fs::metadata(e.path())
192 .ok()
193 .and_then(|m| m.modified().ok())
194 .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
195 )
196 });
197
198 let canonical_workspace =
199 workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
200
201 for entry in &entries {
202 let content: String = fs::read_to_string(entry.path()).await?;
203 if let Ok(session) = serde_json::from_str::<Session>(&content) {
204 if let Some(ref ws) = canonical_workspace {
206 if let Some(ref dir) = session.metadata.directory {
207 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
208 if &canonical_dir == ws {
209 return Ok(session);
210 }
211 }
212 continue;
213 }
214 return Ok(session);
215 }
216 }
217
218 anyhow::bail!("No sessions found")
219 }
220
221 pub async fn last() -> Result<Self> {
223 Self::last_for_directory(None).await
224 }
225
226 pub async fn save(&self) -> Result<()> {
228 let path = Self::session_path(&self.id)?;
229
230 if let Some(parent) = path.parent() {
231 fs::create_dir_all(parent).await?;
232 }
233
234 let content = serde_json::to_string_pretty(self)?;
235 fs::write(&path, content).await?;
236
237 Ok(())
238 }
239
240 pub fn add_message(&mut self, message: Message) {
242 self.messages.push(message);
243 self.updated_at = Utc::now();
244 }
245
246 pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
248 use crate::provider::{
249 CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
250 };
251
252 let registry = ProviderRegistry::from_vault().await?;
254
255 let providers = registry.list();
256 if providers.is_empty() {
257 anyhow::bail!(
258 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
259 );
260 }
261
262 tracing::info!("Available providers: {:?}", providers);
263
264 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
266 let (prov, model) = parse_model_string(model_str);
267 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
268 if prov.is_some() {
269 (prov.map(|s| s.to_string()), model.to_string())
271 } else if providers.contains(&model) {
272 (Some(model.to_string()), String::new())
274 } else {
275 (None, model.to_string())
277 }
278 } else {
279 (None, String::new())
280 };
281
282 let selected_provider = provider_name
284 .as_deref()
285 .filter(|p| providers.contains(p))
286 .or_else(|| choose_default_provider(providers.as_slice()))
287 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
288
289 let provider = registry
290 .get(selected_provider)
291 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
292
293 self.add_message(Message {
295 role: Role::User,
296 content: vec![ContentPart::Text {
297 text: message.to_string(),
298 }],
299 });
300
301 if self.title.is_none() {
303 self.generate_title().await?;
304 }
305
306 let model = if !model_id.is_empty() {
308 model_id
309 } else {
310 Self::default_model_for_provider(selected_provider)
311 };
312
313 {
315 let ctx_window = context_window_for_model(&model);
316 let msg_tokens = RlmChunker::estimate_tokens(message);
317 let threshold = (ctx_window as f64 * 0.35) as usize;
318 if msg_tokens > threshold {
319 tracing::info!(
320 msg_tokens,
321 threshold,
322 ctx_window,
323 "RLM: User message exceeds context threshold, compressing"
324 );
325 let auto_ctx = AutoProcessContext {
326 tool_id: "session_context",
327 tool_args: serde_json::json!({}),
328 session_id: &self.id,
329 abort: None,
330 on_progress: None,
331 provider: Arc::clone(&provider),
332 model: model.clone(),
333 };
334 let rlm_config = RlmConfig::default();
335 match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
336 Ok(result) => {
337 tracing::info!(
338 input_tokens = result.stats.input_tokens,
339 output_tokens = result.stats.output_tokens,
340 "RLM: User message compressed"
341 );
342 if let Some(last) = self.messages.last_mut() {
344 last.content = vec![ContentPart::Text {
345 text: format!(
346 "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
347 msg_tokens,
348 result.processed,
349 message.chars().take(500).collect::<String>()
350 ),
351 }];
352 }
353 }
354 Err(e) => {
355 tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
356 let max_chars = threshold * 4;
357 let truncated = RlmChunker::compress(message, max_chars / 4, None);
358 if let Some(last) = self.messages.last_mut() {
359 last.content = vec![ContentPart::Text { text: truncated }];
360 }
361 }
362 }
363 }
364 }
365
366 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
368 let tool_definitions: Vec<_> = tool_registry
369 .definitions()
370 .into_iter()
371 .filter(|tool| !is_interactive_tool(&tool.name))
372 .collect();
373
374 let temperature = if prefers_temperature_one(&model) {
379 Some(1.0)
380 } else {
381 Some(0.7)
382 };
383
384 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
385 tracing::info!("Available tools: {}", tool_definitions.len());
386
387 let model_supports_tools = true;
391
392 let cwd = self
394 .metadata
395 .directory
396 .clone()
397 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
398 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
399
400 let max_steps = 50;
402 let mut final_output = String::new();
403
404 let tool_router: Option<ToolCallRouter> = {
407 let cfg = ToolRouterConfig::from_env();
408 match ToolCallRouter::from_config(&cfg) {
409 Ok(r) => r,
410 Err(e) => {
411 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
412 None
413 }
414 }
415 };
416
417 for step in 1..=max_steps {
418 tracing::info!(step = step, "Agent step starting");
419
420 let mut messages = vec![Message {
422 role: Role::System,
423 content: vec![ContentPart::Text {
424 text: system_prompt.clone(),
425 }],
426 }];
427 messages.extend(self.messages.clone());
428
429 let request = CompletionRequest {
431 messages,
432 tools: tool_definitions.clone(),
433 model: model.clone(),
434 temperature,
435 top_p: None,
436 max_tokens: Some(8192),
437 stop: Vec::new(),
438 };
439
440 let response = provider.complete(request).await?;
442
443 let response = if let Some(ref router) = tool_router {
448 router
449 .maybe_reformat(response, &tool_definitions, model_supports_tools)
450 .await
451 } else {
452 response
453 };
454
455 crate::telemetry::TOKEN_USAGE.record_model_usage(
457 &model,
458 response.usage.prompt_tokens as u64,
459 response.usage.completion_tokens as u64,
460 );
461
462 let tool_calls: Vec<(String, String, serde_json::Value)> = response
464 .message
465 .content
466 .iter()
467 .filter_map(|part| {
468 if let ContentPart::ToolCall {
469 id,
470 name,
471 arguments,
472 ..
473 } = part
474 {
475 let args: serde_json::Value =
477 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
478 Some((id.clone(), name.clone(), args))
479 } else {
480 None
481 }
482 })
483 .collect();
484
485 for part in &response.message.content {
487 match part {
488 ContentPart::Text { text } if !text.is_empty() => {
489 final_output.push_str(text);
490 final_output.push('\n');
491 }
492 ContentPart::Thinking { text } if !text.is_empty() => {
493 if let Some(ref bus) = self.bus {
494 let handle = bus.handle(&self.agent);
495 handle.send(
496 format!("agent.{}.thinking", self.agent),
497 crate::bus::BusMessage::AgentThinking {
498 agent_id: self.agent.clone(),
499 thinking: text.clone(),
500 step,
501 },
502 );
503 }
504 }
505 _ => {}
506 }
507 }
508
509 if tool_calls.is_empty() {
511 self.add_message(response.message.clone());
512 break;
513 }
514
515 self.add_message(response.message.clone());
517
518 tracing::info!(
519 step = step,
520 num_tools = tool_calls.len(),
521 "Executing tool calls"
522 );
523
524 for (tool_id, tool_name, tool_input) in tool_calls {
526 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
527
528 if let Some(ref bus) = self.bus {
530 let handle = bus.handle(&self.agent);
531 handle.send(
532 format!("agent.{}.tool.request", self.agent),
533 crate::bus::BusMessage::ToolRequest {
534 request_id: tool_id.clone(),
535 agent_id: self.agent.clone(),
536 tool_name: tool_name.clone(),
537 arguments: tool_input.clone(),
538 },
539 );
540 }
541
542 if is_interactive_tool(&tool_name) {
543 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
544 self.add_message(Message {
545 role: Role::Tool,
546 content: vec![ContentPart::ToolResult {
547 tool_call_id: tool_id,
548 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
549 }],
550 });
551 continue;
552 }
553
554 let exec_start = std::time::Instant::now();
556 let content = if let Some(tool) = tool_registry.get(&tool_name) {
557 match tool.execute(tool_input.clone()).await {
558 Ok(result) => {
559 let duration_ms = exec_start.elapsed().as_millis() as u64;
560 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
561 if let Some(audit) = try_audit_log() {
562 audit.log_with_correlation(
563 AuditCategory::ToolExecution,
564 format!("tool:{}", tool_name),
565 if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
566 None,
567 Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
568 None, None, None, Some(self.id.clone()), ).await;
573 }
574 result.output
575 }
576 Err(e) => {
577 let duration_ms = exec_start.elapsed().as_millis() as u64;
578 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
579 if let Some(audit) = try_audit_log() {
580 audit.log_with_correlation(
581 AuditCategory::ToolExecution,
582 format!("tool:{}", tool_name),
583 AuditOutcome::Failure,
584 None,
585 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
586 None, None, None, Some(self.id.clone()), ).await;
591 }
592 format!("Error: {}", e)
593 }
594 }
595 } else {
596 tracing::warn!(tool = %tool_name, "Tool not found");
597 if let Some(audit) = try_audit_log() {
598 audit
599 .log_with_correlation(
600 AuditCategory::ToolExecution,
601 format!("tool:{}", tool_name),
602 AuditOutcome::Failure,
603 None,
604 Some(json!({ "error": "unknown_tool" })),
605 None, None, None, Some(self.id.clone()), )
610 .await;
611 }
612 format!("Error: Unknown tool '{}'", tool_name)
613 };
614
615 let duration_ms = exec_start.elapsed().as_millis() as u64;
617 let success = !content.starts_with("Error:");
618
619 if let Some(ref bus) = self.bus {
622 let handle = bus.handle(&self.agent);
623 handle.send(
624 format!("agent.{}.tool.output", self.agent),
625 crate::bus::BusMessage::ToolOutputFull {
626 agent_id: self.agent.clone(),
627 tool_name: tool_name.clone(),
628 output: content.clone(),
629 success,
630 step,
631 },
632 );
633 }
634
635 if let Some(base_dir) = Self::event_stream_path() {
637 let workspace = std::env::var("PWD")
638 .map(PathBuf::from)
639 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
640 let event = ChatEvent::tool_result(
641 workspace,
642 self.id.clone(),
643 &tool_name,
644 success,
645 duration_ms,
646 &content,
647 self.messages.len() as u64,
648 );
649 let event_json = event.to_json();
650 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
651 let seq = self.messages.len() as u64;
652 let filename = format!(
653 "{}-chat-events-{:020}-{:020}.jsonl",
654 timestamp,
655 seq * 10000,
656 (seq + 1) * 10000
657 );
658 let event_path = base_dir.join(&self.id).join(filename);
659
660 let event_path_clone = event_path;
661 tokio::spawn(async move {
662 if let Some(parent) = event_path_clone.parent() {
663 let _ = tokio::fs::create_dir_all(parent).await;
664 }
665 if let Ok(mut file) = tokio::fs::OpenOptions::new()
666 .create(true)
667 .append(true)
668 .open(&event_path_clone)
669 .await
670 {
671 use tokio::io::AsyncWriteExt;
672 let _ = file.write_all(event_json.as_bytes()).await;
673 let _ = file.write_all(b"\n").await;
674 }
675 });
676 }
677
678 let content = {
680 let ctx_window = context_window_for_model(&model);
681 let total_chars: usize = self
682 .messages
683 .iter()
684 .map(|m| {
685 m.content
686 .iter()
687 .map(|p| match p {
688 ContentPart::Text { text } => text.len(),
689 ContentPart::ToolResult { content, .. } => content.len(),
690 _ => 0,
691 })
692 .sum::<usize>()
693 })
694 .sum();
695 let current_tokens = total_chars / 4; let routing_ctx = RoutingContext {
697 tool_id: tool_name.clone(),
698 session_id: self.id.clone(),
699 call_id: Some(tool_id.clone()),
700 model_context_limit: ctx_window,
701 current_context_tokens: Some(current_tokens),
702 };
703 let rlm_config = RlmConfig::default();
704 let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
705 if routing.should_route {
706 tracing::info!(
707 tool = %tool_name,
708 reason = %routing.reason,
709 estimated_tokens = routing.estimated_tokens,
710 "RLM: Routing large tool output"
711 );
712 let auto_ctx = AutoProcessContext {
713 tool_id: &tool_name,
714 tool_args: tool_input.clone(),
715 session_id: &self.id,
716 abort: None,
717 on_progress: None,
718 provider: Arc::clone(&provider),
719 model: model.clone(),
720 };
721 match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
722 Ok(result) => {
723 tracing::info!(
724 input_tokens = result.stats.input_tokens,
725 output_tokens = result.stats.output_tokens,
726 iterations = result.stats.iterations,
727 "RLM: Processing complete"
728 );
729 result.processed
730 }
731 Err(e) => {
732 tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
733 let (truncated, _, _) = RlmRouter::smart_truncate(
734 &content,
735 &tool_name,
736 &tool_input,
737 ctx_window / 4,
738 );
739 truncated
740 }
741 }
742 } else {
743 content
744 }
745 };
746
747 self.add_message(Message {
749 role: Role::Tool,
750 content: vec![ContentPart::ToolResult {
751 tool_call_id: tool_id,
752 content,
753 }],
754 });
755 }
756 }
757
758 self.save().await?;
760
761 self.archive_event_stream_to_s3().await;
763
764 Ok(SessionResult {
765 text: final_output.trim().to_string(),
766 session_id: self.id.clone(),
767 })
768 }
769
770 async fn archive_event_stream_to_s3(&self) {
772 if !S3Sink::is_configured() {
774 return;
775 }
776
777 let Some(base_dir) = Self::event_stream_path() else {
778 return;
779 };
780
781 let session_event_dir = base_dir.join(&self.id);
782 if !session_event_dir.exists() {
783 return;
784 }
785
786 let Ok(sink) = S3Sink::from_env().await else {
788 tracing::warn!("Failed to create S3 sink for archival");
789 return;
790 };
791
792 let session_id = self.id.clone();
794 tokio::spawn(async move {
795 if let Ok(mut entries) = tokio::fs::read_dir(&session_event_dir).await {
796 while let Ok(Some(entry)) = entries.next_entry().await {
797 let path = entry.path();
798 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
799 match sink.upload_file(&path, &session_id).await {
800 Ok(url) => {
801 tracing::info!(url = %url, "Archived event stream to S3/R2");
802 }
803 Err(e) => {
804 tracing::warn!(error = %e, "Failed to archive event file to S3");
805 }
806 }
807 }
808 }
809 }
810 });
811 }
812
813 pub async fn prompt_with_events(
820 &mut self,
821 message: &str,
822 event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
823 registry: std::sync::Arc<crate::provider::ProviderRegistry>,
824 ) -> Result<SessionResult> {
825 self.prompt_with_events_and_images(message, Vec::new(), event_tx, registry)
826 .await
827 }
828
829 pub async fn prompt_with_events_and_images(
834 &mut self,
835 message: &str,
836 images: Vec<ImageAttachment>,
837 event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
838 registry: std::sync::Arc<crate::provider::ProviderRegistry>,
839 ) -> Result<SessionResult> {
840 use crate::provider::{CompletionRequest, ContentPart, Role, parse_model_string};
841
842 let _ = event_tx.send(SessionEvent::Thinking).await;
843
844 let providers = registry.list();
845 if providers.is_empty() {
846 anyhow::bail!(
847 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
848 );
849 }
850 tracing::info!("Available providers: {:?}", providers);
851
852 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
854 let (prov, model) = parse_model_string(model_str);
855 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
856 if prov.is_some() {
857 (prov.map(|s| s.to_string()), model.to_string())
858 } else if providers.contains(&model) {
859 (Some(model.to_string()), String::new())
860 } else {
861 (None, model.to_string())
862 }
863 } else {
864 (None, String::new())
865 };
866
867 let selected_provider = provider_name
869 .as_deref()
870 .filter(|p| providers.contains(p))
871 .or_else(|| choose_default_provider(providers.as_slice()))
872 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
873
874 let provider = registry
875 .get(selected_provider)
876 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
877
878 let mut content_parts = vec![ContentPart::Text {
880 text: message.to_string(),
881 }];
882
883 for img in &images {
885 content_parts.push(ContentPart::Image {
886 url: img.data_url.clone(),
887 mime_type: img.mime_type.clone(),
888 });
889 }
890
891 if !images.is_empty() {
892 tracing::info!(
893 image_count = images.len(),
894 "Adding {} image attachment(s) to user message",
895 images.len()
896 );
897 }
898
899 self.add_message(Message {
901 role: Role::User,
902 content: content_parts,
903 });
904
905 if self.title.is_none() {
907 self.generate_title().await?;
908 }
909
910 let model = if !model_id.is_empty() {
912 model_id
913 } else {
914 Self::default_model_for_provider(selected_provider)
915 };
916
917 {
919 let ctx_window = context_window_for_model(&model);
920 let msg_tokens = RlmChunker::estimate_tokens(message);
921 let threshold = (ctx_window as f64 * 0.35) as usize;
922 if msg_tokens > threshold {
923 tracing::info!(
924 msg_tokens,
925 threshold,
926 ctx_window,
927 "RLM: User message exceeds context threshold, compressing"
928 );
929 let auto_ctx = AutoProcessContext {
930 tool_id: "session_context",
931 tool_args: serde_json::json!({}),
932 session_id: &self.id,
933 abort: None,
934 on_progress: None,
935 provider: Arc::clone(&provider),
936 model: model.clone(),
937 };
938 let rlm_config = RlmConfig::default();
939 match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
940 Ok(result) => {
941 tracing::info!(
942 input_tokens = result.stats.input_tokens,
943 output_tokens = result.stats.output_tokens,
944 "RLM: User message compressed"
945 );
946 if let Some(last) = self.messages.last_mut() {
947 last.content = vec![ContentPart::Text {
948 text: format!(
949 "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
950 msg_tokens,
951 result.processed,
952 message.chars().take(500).collect::<String>()
953 ),
954 }];
955 }
956 }
957 Err(e) => {
958 tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
959 let max_chars = threshold * 4;
960 let truncated = RlmChunker::compress(message, max_chars / 4, None);
961 if let Some(last) = self.messages.last_mut() {
962 last.content = vec![ContentPart::Text { text: truncated }];
963 }
964 }
965 }
966 }
967 }
968
969 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
971 let tool_definitions: Vec<_> = tool_registry
972 .definitions()
973 .into_iter()
974 .filter(|tool| !is_interactive_tool(&tool.name))
975 .collect();
976
977 let temperature = if prefers_temperature_one(&model) {
978 Some(1.0)
979 } else {
980 Some(0.7)
981 };
982
983 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
984 tracing::info!("Available tools: {}", tool_definitions.len());
985
986 let model_supports_tools = true;
990
991 let cwd = std::env::var("PWD")
993 .map(std::path::PathBuf::from)
994 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
995 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
996
997 let mut final_output = String::new();
998 let max_steps = 50;
999
1000 let tool_router: Option<ToolCallRouter> = {
1003 let cfg = ToolRouterConfig::from_env();
1004 match ToolCallRouter::from_config(&cfg) {
1005 Ok(r) => r,
1006 Err(e) => {
1007 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
1008 None
1009 }
1010 }
1011 };
1012
1013 for step in 1..=max_steps {
1014 tracing::info!(step = step, "Agent step starting");
1015 let _ = event_tx.send(SessionEvent::Thinking).await;
1016
1017 let mut messages = vec![Message {
1019 role: Role::System,
1020 content: vec![ContentPart::Text {
1021 text: system_prompt.clone(),
1022 }],
1023 }];
1024 messages.extend(self.messages.clone());
1025
1026 let request = CompletionRequest {
1027 messages,
1028 tools: tool_definitions.clone(),
1029 model: model.clone(),
1030 temperature,
1031 top_p: None,
1032 max_tokens: Some(8192),
1033 stop: Vec::new(),
1034 };
1035
1036 let llm_start = std::time::Instant::now();
1037 let response = provider.complete(request).await?;
1038 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
1039
1040 let response = if let Some(ref router) = tool_router {
1044 router
1045 .maybe_reformat(response, &tool_definitions, model_supports_tools)
1046 .await
1047 } else {
1048 response
1049 };
1050
1051 crate::telemetry::TOKEN_USAGE.record_model_usage(
1052 &model,
1053 response.usage.prompt_tokens as u64,
1054 response.usage.completion_tokens as u64,
1055 );
1056
1057 let _ = event_tx
1059 .send(SessionEvent::UsageReport {
1060 prompt_tokens: response.usage.prompt_tokens,
1061 completion_tokens: response.usage.completion_tokens,
1062 duration_ms: llm_duration_ms,
1063 model: model.clone(),
1064 })
1065 .await;
1066
1067 let tool_calls: Vec<(String, String, serde_json::Value)> = response
1069 .message
1070 .content
1071 .iter()
1072 .filter_map(|part| {
1073 if let ContentPart::ToolCall {
1074 id,
1075 name,
1076 arguments,
1077 ..
1078 } = part
1079 {
1080 let args: serde_json::Value =
1081 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
1082 Some((id.clone(), name.clone(), args))
1083 } else {
1084 None
1085 }
1086 })
1087 .collect();
1088
1089 let mut thinking_text = String::new();
1092 let mut step_text = String::new();
1093 for part in &response.message.content {
1094 match part {
1095 ContentPart::Thinking { text } => {
1096 if !text.is_empty() {
1097 thinking_text.push_str(text);
1098 thinking_text.push('\n');
1099 }
1100 }
1101 ContentPart::Text { text } => {
1102 if !text.is_empty() {
1103 step_text.push_str(text);
1104 step_text.push('\n');
1105 }
1106 }
1107 _ => {}
1108 }
1109 }
1110
1111 if !thinking_text.trim().is_empty() {
1113 let _ = event_tx
1114 .send(SessionEvent::ThinkingComplete(
1115 thinking_text.trim().to_string(),
1116 ))
1117 .await;
1118 if let Some(ref bus) = self.bus {
1119 let handle = bus.handle(&self.agent);
1120 handle.send(
1121 format!("agent.{}.thinking", self.agent),
1122 crate::bus::BusMessage::AgentThinking {
1123 agent_id: self.agent.clone(),
1124 thinking: thinking_text.trim().to_string(),
1125 step,
1126 },
1127 );
1128 }
1129 }
1130
1131 if !step_text.trim().is_empty() {
1134 let trimmed = step_text.trim().to_string();
1135 let _ = event_tx
1136 .send(SessionEvent::TextChunk(trimmed.clone()))
1137 .await;
1138 let _ = event_tx.send(SessionEvent::TextComplete(trimmed)).await;
1139 final_output.push_str(&step_text);
1140 }
1141
1142 if tool_calls.is_empty() {
1143 self.add_message(response.message.clone());
1144 break;
1145 }
1146
1147 self.add_message(response.message.clone());
1148
1149 tracing::info!(
1150 step = step,
1151 num_tools = tool_calls.len(),
1152 "Executing tool calls"
1153 );
1154
1155 for (tool_id, tool_name, tool_input) in tool_calls {
1157 let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
1158 let _ = event_tx
1159 .send(SessionEvent::ToolCallStart {
1160 name: tool_name.clone(),
1161 arguments: args_str,
1162 })
1163 .await;
1164
1165 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
1166
1167 if let Some(ref bus) = self.bus {
1169 let handle = bus.handle(&self.agent);
1170 handle.send(
1171 format!("agent.{}.tool.request", self.agent),
1172 crate::bus::BusMessage::ToolRequest {
1173 request_id: tool_id.clone(),
1174 agent_id: self.agent.clone(),
1175 tool_name: tool_name.clone(),
1176 arguments: tool_input.clone(),
1177 },
1178 );
1179 }
1180
1181 if is_interactive_tool(&tool_name) {
1182 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
1183 let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
1184 let _ = event_tx
1185 .send(SessionEvent::ToolCallComplete {
1186 name: tool_name.clone(),
1187 output: content.clone(),
1188 success: false,
1189 })
1190 .await;
1191 self.add_message(Message {
1192 role: Role::Tool,
1193 content: vec![ContentPart::ToolResult {
1194 tool_call_id: tool_id,
1195 content,
1196 }],
1197 });
1198 continue;
1199 }
1200
1201 let exec_start = std::time::Instant::now();
1202 let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
1203 match tool.execute(tool_input.clone()).await {
1204 Ok(result) => {
1205 let duration_ms = exec_start.elapsed().as_millis() as u64;
1206 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
1207 if let Some(audit) = try_audit_log() {
1208 audit.log_with_correlation(
1209 AuditCategory::ToolExecution,
1210 format!("tool:{}", tool_name),
1211 if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
1212 None,
1213 Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
1214 None, None, None, Some(self.id.clone()), ).await;
1219 }
1220 (result.output, result.success)
1221 }
1222 Err(e) => {
1223 let duration_ms = exec_start.elapsed().as_millis() as u64;
1224 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
1225 if let Some(audit) = try_audit_log() {
1226 audit.log_with_correlation(
1227 AuditCategory::ToolExecution,
1228 format!("tool:{}", tool_name),
1229 AuditOutcome::Failure,
1230 None,
1231 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
1232 None, None, None, Some(self.id.clone()), ).await;
1237 }
1238 (format!("Error: {}", e), false)
1239 }
1240 }
1241 } else {
1242 tracing::warn!(tool = %tool_name, "Tool not found");
1243 if let Some(audit) = try_audit_log() {
1244 audit
1245 .log_with_correlation(
1246 AuditCategory::ToolExecution,
1247 format!("tool:{}", tool_name),
1248 AuditOutcome::Failure,
1249 None,
1250 Some(json!({ "error": "unknown_tool" })),
1251 None, None, None, Some(self.id.clone()), )
1256 .await;
1257 }
1258 (format!("Error: Unknown tool '{}'", tool_name), false)
1259 };
1260
1261 let duration_ms = exec_start.elapsed().as_millis() as u64;
1263
1264 if let Some(ref bus) = self.bus {
1266 let handle = bus.handle(&self.agent);
1267 handle.send(
1268 format!("agent.{}.tool.output", self.agent),
1269 crate::bus::BusMessage::ToolOutputFull {
1270 agent_id: self.agent.clone(),
1271 tool_name: tool_name.clone(),
1272 output: content.clone(),
1273 success,
1274 step,
1275 },
1276 );
1277 }
1278
1279 if let Some(base_dir) = Self::event_stream_path() {
1283 let workspace = std::env::var("PWD")
1284 .map(PathBuf::from)
1285 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
1286 let event = ChatEvent::tool_result(
1287 workspace,
1288 self.id.clone(),
1289 &tool_name,
1290 success,
1291 duration_ms,
1292 &content,
1293 self.messages.len() as u64,
1294 );
1295 let event_json = event.to_json();
1296 let event_size = event_json.len() as u64 + 1; let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
1302 let seq = self.messages.len() as u64;
1303 let filename = format!(
1304 "{}-chat-events-{:020}-{:020}.jsonl",
1305 timestamp,
1306 seq * 10000, (seq + 1) * 10000 );
1309 let event_path = base_dir.join(&self.id).join(filename);
1310
1311 let event_path_clone = event_path;
1313 tokio::spawn(async move {
1314 if let Some(parent) = event_path_clone.parent() {
1315 let _ = tokio::fs::create_dir_all(parent).await;
1316 }
1317 if let Ok(mut file) = tokio::fs::OpenOptions::new()
1318 .create(true)
1319 .append(true)
1320 .open(&event_path_clone)
1321 .await
1322 {
1323 use tokio::io::AsyncWriteExt;
1324 let _ = file.write_all(event_json.as_bytes()).await;
1325 let _ = file.write_all(b"\n").await;
1326 tracing::debug!(path = %event_path_clone.display(), size = event_size, "Event stream wrote");
1327 }
1328 });
1329 }
1330
1331 let _ = event_tx
1332 .send(SessionEvent::ToolCallComplete {
1333 name: tool_name.clone(),
1334 output: content.clone(),
1335 success,
1336 })
1337 .await;
1338
1339 let content = {
1341 let ctx_window = context_window_for_model(&model);
1342 let total_chars: usize = self
1343 .messages
1344 .iter()
1345 .map(|m| {
1346 m.content
1347 .iter()
1348 .map(|p| match p {
1349 ContentPart::Text { text } => text.len(),
1350 ContentPart::ToolResult { content, .. } => content.len(),
1351 _ => 0,
1352 })
1353 .sum::<usize>()
1354 })
1355 .sum();
1356 let current_tokens = total_chars / 4;
1357 let routing_ctx = RoutingContext {
1358 tool_id: tool_name.clone(),
1359 session_id: self.id.clone(),
1360 call_id: Some(tool_id.clone()),
1361 model_context_limit: ctx_window,
1362 current_context_tokens: Some(current_tokens),
1363 };
1364 let rlm_config = RlmConfig::default();
1365 let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
1366 if routing.should_route {
1367 tracing::info!(
1368 tool = %tool_name,
1369 reason = %routing.reason,
1370 estimated_tokens = routing.estimated_tokens,
1371 "RLM: Routing large tool output"
1372 );
1373 let auto_ctx = AutoProcessContext {
1374 tool_id: &tool_name,
1375 tool_args: tool_input.clone(),
1376 session_id: &self.id,
1377 abort: None,
1378 on_progress: None,
1379 provider: Arc::clone(&provider),
1380 model: model.clone(),
1381 };
1382 match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
1383 Ok(result) => {
1384 tracing::info!(
1385 input_tokens = result.stats.input_tokens,
1386 output_tokens = result.stats.output_tokens,
1387 iterations = result.stats.iterations,
1388 "RLM: Processing complete"
1389 );
1390 result.processed
1391 }
1392 Err(e) => {
1393 tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1394 let (truncated, _, _) = RlmRouter::smart_truncate(
1395 &content,
1396 &tool_name,
1397 &tool_input,
1398 ctx_window / 4,
1399 );
1400 truncated
1401 }
1402 }
1403 } else {
1404 content
1405 }
1406 };
1407
1408 self.add_message(Message {
1409 role: Role::Tool,
1410 content: vec![ContentPart::ToolResult {
1411 tool_call_id: tool_id,
1412 content,
1413 }],
1414 });
1415 }
1416 }
1417
1418 self.save().await?;
1419
1420 self.archive_event_stream_to_s3().await;
1422
1423 let _ = event_tx.send(SessionEvent::SessionSync(self.clone())).await;
1426 let _ = event_tx.send(SessionEvent::Done).await;
1427
1428 Ok(SessionResult {
1429 text: final_output.trim().to_string(),
1430 session_id: self.id.clone(),
1431 })
1432 }
1433
1434 pub async fn generate_title(&mut self) -> Result<()> {
1437 if self.title.is_some() {
1438 return Ok(());
1439 }
1440
1441 let first_message = self
1443 .messages
1444 .iter()
1445 .find(|m| m.role == crate::provider::Role::User);
1446
1447 if let Some(msg) = first_message {
1448 let text: String = msg
1449 .content
1450 .iter()
1451 .filter_map(|p| match p {
1452 crate::provider::ContentPart::Text { text } => Some(text.clone()),
1453 _ => None,
1454 })
1455 .collect::<Vec<_>>()
1456 .join(" ");
1457
1458 self.title = Some(truncate_with_ellipsis(&text, 47));
1460 }
1461
1462 Ok(())
1463 }
1464
1465 pub async fn regenerate_title(&mut self) -> Result<()> {
1468 let first_message = self
1470 .messages
1471 .iter()
1472 .find(|m| m.role == crate::provider::Role::User);
1473
1474 if let Some(msg) = first_message {
1475 let text: String = msg
1476 .content
1477 .iter()
1478 .filter_map(|p| match p {
1479 crate::provider::ContentPart::Text { text } => Some(text.clone()),
1480 _ => None,
1481 })
1482 .collect::<Vec<_>>()
1483 .join(" ");
1484
1485 self.title = Some(truncate_with_ellipsis(&text, 47));
1487 }
1488
1489 Ok(())
1490 }
1491
1492 pub fn set_title(&mut self, title: impl Into<String>) {
1494 self.title = Some(title.into());
1495 self.updated_at = Utc::now();
1496 }
1497
1498 pub fn clear_title(&mut self) {
1500 self.title = None;
1501 self.updated_at = Utc::now();
1502 }
1503
1504 pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
1507 self.updated_at = Utc::now();
1508
1509 if regenerate_title {
1510 self.regenerate_title().await?;
1511 }
1512
1513 Ok(())
1514 }
1515
1516 pub async fn delete(id: &str) -> Result<()> {
1518 let path = Self::session_path(id)?;
1519 if path.exists() {
1520 tokio::fs::remove_file(&path).await?;
1521 }
1522 Ok(())
1523 }
1524
1525 fn sessions_dir() -> Result<PathBuf> {
1527 crate::config::Config::data_dir()
1528 .map(|d| d.join("sessions"))
1529 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
1530 }
1531
1532 fn session_path(id: &str) -> Result<PathBuf> {
1534 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
1535 }
1536
1537 fn event_stream_path() -> Option<PathBuf> {
1540 std::env::var("CODETETHER_EVENT_STREAM_PATH")
1541 .ok()
1542 .map(PathBuf::from)
1543 }
1544}
1545
1546#[derive(Debug, Clone, Serialize, Deserialize)]
1548pub struct SessionResult {
1549 pub text: String,
1550 pub session_id: String,
1551}
1552
1553#[derive(Debug, Clone)]
1555pub enum SessionEvent {
1556 Thinking,
1558 ToolCallStart { name: String, arguments: String },
1560 ToolCallComplete {
1562 name: String,
1563 output: String,
1564 success: bool,
1565 },
1566 TextChunk(String),
1568 TextComplete(String),
1570 ThinkingComplete(String),
1572 UsageReport {
1574 prompt_tokens: usize,
1575 completion_tokens: usize,
1576 duration_ms: u64,
1577 model: String,
1578 },
1579 SessionSync(Session),
1581 Done,
1583 Error(String),
1585}
1586
1587pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
1589 let sessions_dir = crate::config::Config::data_dir()
1590 .map(|d| d.join("sessions"))
1591 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
1592
1593 if !sessions_dir.exists() {
1594 return Ok(Vec::new());
1595 }
1596
1597 let mut summaries = Vec::new();
1598 let mut entries = fs::read_dir(&sessions_dir).await?;
1599
1600 while let Some(entry) = entries.next_entry().await? {
1601 let path = entry.path();
1602 if path.extension().map(|e| e == "json").unwrap_or(false) {
1603 if let Ok(content) = fs::read_to_string(&path).await {
1604 if let Ok(session) = serde_json::from_str::<Session>(&content) {
1605 summaries.push(SessionSummary {
1606 id: session.id,
1607 title: session.title,
1608 created_at: session.created_at,
1609 updated_at: session.updated_at,
1610 message_count: session.messages.len(),
1611 agent: session.agent,
1612 directory: session.metadata.directory,
1613 });
1614 }
1615 }
1616 }
1617 }
1618
1619 summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1620 Ok(summaries)
1621}
1622
1623pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1628 let all = list_sessions().await?;
1629 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
1630 Ok(all
1631 .into_iter()
1632 .filter(|s| {
1633 s.directory
1634 .as_ref()
1635 .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
1636 .unwrap_or(false)
1637 })
1638 .collect())
1639}
1640
1641pub async fn list_sessions_paged(
1646 dir: &std::path::Path,
1647 limit: usize,
1648 offset: usize,
1649) -> Result<Vec<SessionSummary>> {
1650 let mut sessions = list_sessions_for_directory(dir).await?;
1651 sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1652 Ok(sessions.into_iter().skip(offset).take(limit).collect())
1653}
1654
1655#[derive(Debug, Clone, Serialize, Deserialize)]
1657pub struct SessionSummary {
1658 pub id: String,
1659 pub title: Option<String>,
1660 pub created_at: DateTime<Utc>,
1661 pub updated_at: DateTime<Utc>,
1662 pub message_count: usize,
1663 pub agent: String,
1664 #[serde(default)]
1666 pub directory: Option<PathBuf>,
1667}
1668
1669fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
1670 if max_chars == 0 {
1671 return String::new();
1672 }
1673
1674 let mut chars = value.chars();
1675 let mut output = String::new();
1676 for _ in 0..max_chars {
1677 if let Some(ch) = chars.next() {
1678 output.push(ch);
1679 } else {
1680 return value.to_string();
1681 }
1682 }
1683
1684 if chars.next().is_some() {
1685 format!("{output}...")
1686 } else {
1687 output
1688 }
1689}
1690
1691#[allow(dead_code)]
1693use futures::StreamExt;
1694
1695#[allow(dead_code)]
1696trait AsyncCollect<T> {
1697 async fn collect(self) -> Vec<T>;
1698}
1699
1700#[allow(dead_code)]
1701impl<S, T> AsyncCollect<T> for S
1702where
1703 S: futures::Stream<Item = T> + Unpin,
1704{
1705 async fn collect(mut self) -> Vec<T> {
1706 let mut items = Vec::new();
1707 while let Some(item) = self.next().await {
1708 items.push(item);
1709 }
1710 items
1711 }
1712}