1use crate::agent::ToolUse;
6use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
7use crate::event_stream::ChatEvent;
8use crate::event_stream::s3_sink::S3Sink;
9use crate::provider::{Message, Usage};
10use crate::rlm::{RlmChunker, RlmConfig, RlmRouter, RoutingContext};
11use crate::rlm::router::AutoProcessContext;
12use crate::tool::ToolRegistry;
13use anyhow::Result;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::path::PathBuf;
18use std::sync::Arc;
19use tokio::fs;
20use uuid::Uuid;
21
22#[cfg(feature = "functiongemma")]
23use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
24
25fn is_interactive_tool(tool_name: &str) -> bool {
26 matches!(tool_name, "question")
27}
28
29fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
30 let preferred = [
33 "zai",
34 "openai",
35 "github-copilot",
36 "anthropic",
37 "minimax",
38 "openrouter",
39 "novita",
40 "moonshotai",
41 "google",
42 ];
43 for name in preferred {
44 if let Some(found) = providers.iter().copied().find(|p| *p == name) {
45 return Some(found);
46 }
47 }
48 providers.first().copied()
49}
50
51fn prefers_temperature_one(model: &str) -> bool {
52 let normalized = model.to_ascii_lowercase();
53 normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
54}
55
56fn context_window_for_model(model: &str) -> usize {
58 let m = model.to_ascii_lowercase();
59 if m.contains("kimi-k2") {
60 256_000
61 } else if m.contains("glm-5") || m.contains("glm5") {
62 200_000
63 } else if m.contains("gpt-4o") {
64 128_000
65 } else if m.contains("gpt-5") {
66 256_000
67 } else if m.contains("claude") {
68 200_000
69 } else if m.contains("gemini") {
70 1_000_000
71 } else if m.contains("minimax") || m.contains("m2.5") {
72 256_000
73 } else if m.contains("qwen") {
74 131_072
75 } else {
76 128_000 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Session {
83 pub id: String,
84 pub title: Option<String>,
85 pub created_at: DateTime<Utc>,
86 pub updated_at: DateTime<Utc>,
87 pub messages: Vec<Message>,
88 pub tool_uses: Vec<ToolUse>,
89 pub usage: Usage,
90 pub agent: String,
91 pub metadata: SessionMetadata,
92}
93
94#[derive(Debug, Clone, Default, Serialize, Deserialize)]
95pub struct SessionMetadata {
96 pub directory: Option<PathBuf>,
97 pub model: Option<String>,
98 pub shared: bool,
99 pub share_url: Option<String>,
100}
101
102impl Session {
103 fn default_model_for_provider(provider: &str) -> String {
104 match provider {
105 "moonshotai" => "kimi-k2.5".to_string(),
106 "anthropic" => "claude-sonnet-4-20250514".to_string(),
107 "minimax" => "MiniMax-M2.5".to_string(),
108 "openai" => "gpt-4o".to_string(),
109 "google" => "gemini-2.5-pro".to_string(),
110 "zhipuai" | "zai" => "glm-5".to_string(),
111 "openrouter" => "z-ai/glm-5".to_string(),
113 "novita" => "qwen/qwen3-coder-next".to_string(),
114 "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
115 _ => "glm-5".to_string(),
116 }
117 }
118
119 pub async fn new() -> Result<Self> {
121 let id = Uuid::new_v4().to_string();
122 let now = Utc::now();
123
124 Ok(Self {
125 id,
126 title: None,
127 created_at: now,
128 updated_at: now,
129 messages: Vec::new(),
130 tool_uses: Vec::new(),
131 usage: Usage::default(),
132 agent: "build".to_string(),
133 metadata: SessionMetadata {
134 directory: Some(std::env::current_dir()?),
135 ..Default::default()
136 },
137 })
138 }
139
140 pub async fn load(id: &str) -> Result<Self> {
142 let path = Self::session_path(id)?;
143 let content = fs::read_to_string(&path).await?;
144 let session: Session = serde_json::from_str(&content)?;
145 Ok(session)
146 }
147
148 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
153 let sessions_dir = Self::sessions_dir()?;
154
155 if !sessions_dir.exists() {
156 anyhow::bail!("No sessions found");
157 }
158
159 let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
160 let mut read_dir = fs::read_dir(&sessions_dir).await?;
161 while let Some(entry) = read_dir.next_entry().await? {
162 entries.push(entry);
163 }
164
165 if entries.is_empty() {
166 anyhow::bail!("No sessions found");
167 }
168
169 entries.sort_by_key(|e| {
172 std::cmp::Reverse(
173 std::fs::metadata(e.path())
174 .ok()
175 .and_then(|m| m.modified().ok())
176 .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
177 )
178 });
179
180 let canonical_workspace =
181 workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
182
183 for entry in &entries {
184 let content: String = fs::read_to_string(entry.path()).await?;
185 if let Ok(session) = serde_json::from_str::<Session>(&content) {
186 if let Some(ref ws) = canonical_workspace {
188 if let Some(ref dir) = session.metadata.directory {
189 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
190 if &canonical_dir == ws {
191 return Ok(session);
192 }
193 }
194 continue;
195 }
196 return Ok(session);
197 }
198 }
199
200 anyhow::bail!("No sessions found")
201 }
202
203 pub async fn last() -> Result<Self> {
205 Self::last_for_directory(None).await
206 }
207
208 pub async fn save(&self) -> Result<()> {
210 let path = Self::session_path(&self.id)?;
211
212 if let Some(parent) = path.parent() {
213 fs::create_dir_all(parent).await?;
214 }
215
216 let content = serde_json::to_string_pretty(self)?;
217 fs::write(&path, content).await?;
218
219 Ok(())
220 }
221
222 pub fn add_message(&mut self, message: Message) {
224 self.messages.push(message);
225 self.updated_at = Utc::now();
226 }
227
228 pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
230 use crate::provider::{
231 CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
232 };
233
234 let registry = ProviderRegistry::from_vault().await?;
236
237 let providers = registry.list();
238 if providers.is_empty() {
239 anyhow::bail!(
240 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
241 );
242 }
243
244 tracing::info!("Available providers: {:?}", providers);
245
246 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
248 let (prov, model) = parse_model_string(model_str);
249 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
250 if prov.is_some() {
251 (prov.map(|s| s.to_string()), model.to_string())
253 } else if providers.contains(&model) {
254 (Some(model.to_string()), String::new())
256 } else {
257 (None, model.to_string())
259 }
260 } else {
261 (None, String::new())
262 };
263
264 let selected_provider = provider_name
266 .as_deref()
267 .filter(|p| providers.contains(p))
268 .or_else(|| choose_default_provider(providers.as_slice()))
269 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
270
271 let provider = registry
272 .get(selected_provider)
273 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
274
275 self.add_message(Message {
277 role: Role::User,
278 content: vec![ContentPart::Text {
279 text: message.to_string(),
280 }],
281 });
282
283 if self.title.is_none() {
285 self.generate_title().await?;
286 }
287
288 let model = if !model_id.is_empty() {
290 model_id
291 } else {
292 Self::default_model_for_provider(selected_provider)
293 };
294
295 {
297 let ctx_window = context_window_for_model(&model);
298 let msg_tokens = RlmChunker::estimate_tokens(message);
299 let threshold = (ctx_window as f64 * 0.35) as usize;
300 if msg_tokens > threshold {
301 tracing::info!(
302 msg_tokens,
303 threshold,
304 ctx_window,
305 "RLM: User message exceeds context threshold, compressing"
306 );
307 let auto_ctx = AutoProcessContext {
308 tool_id: "session_context",
309 tool_args: serde_json::json!({}),
310 session_id: &self.id,
311 abort: None,
312 on_progress: None,
313 provider: Arc::clone(&provider),
314 model: model.clone(),
315 };
316 let rlm_config = RlmConfig::default();
317 match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
318 Ok(result) => {
319 tracing::info!(
320 input_tokens = result.stats.input_tokens,
321 output_tokens = result.stats.output_tokens,
322 "RLM: User message compressed"
323 );
324 if let Some(last) = self.messages.last_mut() {
326 last.content = vec![ContentPart::Text {
327 text: format!(
328 "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
329 msg_tokens,
330 result.processed,
331 message.chars().take(500).collect::<String>()
332 ),
333 }];
334 }
335 }
336 Err(e) => {
337 tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
338 let max_chars = threshold * 4;
339 let truncated = RlmChunker::compress(message, max_chars / 4, None);
340 if let Some(last) = self.messages.last_mut() {
341 last.content = vec![ContentPart::Text {
342 text: truncated,
343 }];
344 }
345 }
346 }
347 }
348 }
349
350 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
352 let tool_definitions: Vec<_> = tool_registry
353 .definitions()
354 .into_iter()
355 .filter(|tool| !is_interactive_tool(&tool.name))
356 .collect();
357
358 let temperature = if prefers_temperature_one(&model) {
363 Some(1.0)
364 } else {
365 Some(0.7)
366 };
367
368 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
369 tracing::info!("Available tools: {}", tool_definitions.len());
370
371 #[cfg(feature = "functiongemma")]
374 let model_supports_tools = true;
375
376 let cwd = self
378 .metadata
379 .directory
380 .clone()
381 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
382 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
383
384 let max_steps = 50;
386 let mut final_output = String::new();
387
388 #[cfg(feature = "functiongemma")]
390 let tool_router: Option<ToolCallRouter> = {
391 let cfg = ToolRouterConfig::from_env();
392 match ToolCallRouter::from_config(&cfg) {
393 Ok(r) => r,
394 Err(e) => {
395 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
396 None
397 }
398 }
399 };
400
401 for step in 1..=max_steps {
402 tracing::info!(step = step, "Agent step starting");
403
404 let mut messages = vec![Message {
406 role: Role::System,
407 content: vec![ContentPart::Text {
408 text: system_prompt.clone(),
409 }],
410 }];
411 messages.extend(self.messages.clone());
412
413 let request = CompletionRequest {
415 messages,
416 tools: tool_definitions.clone(),
417 model: model.clone(),
418 temperature,
419 top_p: None,
420 max_tokens: Some(8192),
421 stop: Vec::new(),
422 };
423
424 let response = provider.complete(request).await?;
426
427 #[cfg(feature = "functiongemma")]
431 let response = if let Some(ref router) = tool_router {
432 router
433 .maybe_reformat(response, &tool_definitions, model_supports_tools)
434 .await
435 } else {
436 response
437 };
438
439 crate::telemetry::TOKEN_USAGE.record_model_usage(
441 &model,
442 response.usage.prompt_tokens as u64,
443 response.usage.completion_tokens as u64,
444 );
445
446 let tool_calls: Vec<(String, String, serde_json::Value)> = response
448 .message
449 .content
450 .iter()
451 .filter_map(|part| {
452 if let ContentPart::ToolCall {
453 id,
454 name,
455 arguments,
456 } = part
457 {
458 let args: serde_json::Value =
460 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
461 Some((id.clone(), name.clone(), args))
462 } else {
463 None
464 }
465 })
466 .collect();
467
468 for part in &response.message.content {
470 if let ContentPart::Text { text } = part {
471 if !text.is_empty() {
472 final_output.push_str(text);
473 final_output.push('\n');
474 }
475 }
476 }
477
478 if tool_calls.is_empty() {
480 self.add_message(response.message.clone());
481 break;
482 }
483
484 self.add_message(response.message.clone());
486
487 tracing::info!(
488 step = step,
489 num_tools = tool_calls.len(),
490 "Executing tool calls"
491 );
492
493 for (tool_id, tool_name, tool_input) in tool_calls {
495 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
496
497 if is_interactive_tool(&tool_name) {
498 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
499 self.add_message(Message {
500 role: Role::Tool,
501 content: vec![ContentPart::ToolResult {
502 tool_call_id: tool_id,
503 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
504 }],
505 });
506 continue;
507 }
508
509 let exec_start = std::time::Instant::now();
511 let content = if let Some(tool) = tool_registry.get(&tool_name) {
512 match tool.execute(tool_input.clone()).await {
513 Ok(result) => {
514 let duration_ms = exec_start.elapsed().as_millis() as u64;
515 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
516 if let Some(audit) = try_audit_log() {
517 audit.log_with_correlation(
518 AuditCategory::ToolExecution,
519 format!("tool:{}", tool_name),
520 if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
521 None,
522 Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
523 None, None, None, Some(self.id.clone()), ).await;
528 }
529 result.output
530 }
531 Err(e) => {
532 let duration_ms = exec_start.elapsed().as_millis() as u64;
533 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
534 if let Some(audit) = try_audit_log() {
535 audit.log_with_correlation(
536 AuditCategory::ToolExecution,
537 format!("tool:{}", tool_name),
538 AuditOutcome::Failure,
539 None,
540 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
541 None, None, None, Some(self.id.clone()), ).await;
546 }
547 format!("Error: {}", e)
548 }
549 }
550 } else {
551 tracing::warn!(tool = %tool_name, "Tool not found");
552 if let Some(audit) = try_audit_log() {
553 audit
554 .log_with_correlation(
555 AuditCategory::ToolExecution,
556 format!("tool:{}", tool_name),
557 AuditOutcome::Failure,
558 None,
559 Some(json!({ "error": "unknown_tool" })),
560 None, None, None, Some(self.id.clone()), )
565 .await;
566 }
567 format!("Error: Unknown tool '{}'", tool_name)
568 };
569
570 let duration_ms = exec_start.elapsed().as_millis() as u64;
572 let success = !content.starts_with("Error:");
573
574 if let Some(base_dir) = Self::event_stream_path() {
576 let workspace = std::env::var("PWD")
577 .map(PathBuf::from)
578 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
579 let event = ChatEvent::tool_result(
580 workspace,
581 self.id.clone(),
582 &tool_name,
583 success,
584 duration_ms,
585 &content,
586 self.messages.len() as u64,
587 );
588 let event_json = event.to_json();
589 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
590 let seq = self.messages.len() as u64;
591 let filename = format!(
592 "{}-chat-events-{:020}-{:020}.jsonl",
593 timestamp,
594 seq * 10000,
595 (seq + 1) * 10000
596 );
597 let event_path = base_dir.join(&self.id).join(filename);
598
599 let event_path_clone = event_path;
600 tokio::spawn(async move {
601 if let Some(parent) = event_path_clone.parent() {
602 let _ = tokio::fs::create_dir_all(parent).await;
603 }
604 if let Ok(mut file) = tokio::fs::OpenOptions::new()
605 .create(true)
606 .append(true)
607 .open(&event_path_clone)
608 .await
609 {
610 use tokio::io::AsyncWriteExt;
611 let _ = file.write_all(event_json.as_bytes()).await;
612 let _ = file.write_all(b"\n").await;
613 }
614 });
615 }
616
617 let content = {
619 let ctx_window = context_window_for_model(&model);
620 let total_chars: usize = self.messages.iter().map(|m| m.content.iter().map(|p| match p {
621 ContentPart::Text { text } => text.len(),
622 ContentPart::ToolResult { content, .. } => content.len(),
623 _ => 0,
624 }).sum::<usize>()).sum();
625 let current_tokens = total_chars / 4; let routing_ctx = RoutingContext {
627 tool_id: tool_name.clone(),
628 session_id: self.id.clone(),
629 call_id: Some(tool_id.clone()),
630 model_context_limit: ctx_window,
631 current_context_tokens: Some(current_tokens),
632 };
633 let rlm_config = RlmConfig::default();
634 let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
635 if routing.should_route {
636 tracing::info!(
637 tool = %tool_name,
638 reason = %routing.reason,
639 estimated_tokens = routing.estimated_tokens,
640 "RLM: Routing large tool output"
641 );
642 let auto_ctx = AutoProcessContext {
643 tool_id: &tool_name,
644 tool_args: tool_input.clone(),
645 session_id: &self.id,
646 abort: None,
647 on_progress: None,
648 provider: Arc::clone(&provider),
649 model: model.clone(),
650 };
651 match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
652 Ok(result) => {
653 tracing::info!(
654 input_tokens = result.stats.input_tokens,
655 output_tokens = result.stats.output_tokens,
656 iterations = result.stats.iterations,
657 "RLM: Processing complete"
658 );
659 result.processed
660 }
661 Err(e) => {
662 tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
663 let (truncated, _, _) = RlmRouter::smart_truncate(
664 &content, &tool_name, &tool_input, ctx_window / 4,
665 );
666 truncated
667 }
668 }
669 } else {
670 content
671 }
672 };
673
674 self.add_message(Message {
676 role: Role::Tool,
677 content: vec![ContentPart::ToolResult {
678 tool_call_id: tool_id,
679 content,
680 }],
681 });
682 }
683 }
684
685 self.save().await?;
687
688 self.archive_event_stream_to_s3().await;
690
691 Ok(SessionResult {
692 text: final_output.trim().to_string(),
693 session_id: self.id.clone(),
694 })
695 }
696
697 async fn archive_event_stream_to_s3(&self) {
699 if !S3Sink::is_configured() {
701 return;
702 }
703
704 let Some(base_dir) = Self::event_stream_path() else {
705 return;
706 };
707
708 let session_event_dir = base_dir.join(&self.id);
709 if !session_event_dir.exists() {
710 return;
711 }
712
713 let Ok(sink) = S3Sink::from_env().await else {
715 tracing::warn!("Failed to create S3 sink for archival");
716 return;
717 };
718
719 let session_id = self.id.clone();
721 tokio::spawn(async move {
722 if let Ok(mut entries) = tokio::fs::read_dir(&session_event_dir).await {
723 while let Ok(Some(entry)) = entries.next_entry().await {
724 let path = entry.path();
725 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
726 match sink.upload_file(&path, &session_id).await {
727 Ok(url) => {
728 tracing::info!(url = %url, "Archived event stream to S3/R2");
729 }
730 Err(e) => {
731 tracing::warn!(error = %e, "Failed to archive event file to S3");
732 }
733 }
734 }
735 }
736 }
737 });
738 }
739
740 pub async fn prompt_with_events(
747 &mut self,
748 message: &str,
749 event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
750 registry: std::sync::Arc<crate::provider::ProviderRegistry>,
751 ) -> Result<SessionResult> {
752 use crate::provider::{CompletionRequest, ContentPart, Role, parse_model_string};
753
754 let _ = event_tx.send(SessionEvent::Thinking).await;
755
756 let providers = registry.list();
757 if providers.is_empty() {
758 anyhow::bail!(
759 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
760 );
761 }
762 tracing::info!("Available providers: {:?}", providers);
763
764 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
766 let (prov, model) = parse_model_string(model_str);
767 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
768 if prov.is_some() {
769 (prov.map(|s| s.to_string()), model.to_string())
770 } else if providers.contains(&model) {
771 (Some(model.to_string()), String::new())
772 } else {
773 (None, model.to_string())
774 }
775 } else {
776 (None, String::new())
777 };
778
779 let selected_provider = provider_name
781 .as_deref()
782 .filter(|p| providers.contains(p))
783 .or_else(|| choose_default_provider(providers.as_slice()))
784 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
785
786 let provider = registry
787 .get(selected_provider)
788 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
789
790 self.add_message(Message {
792 role: Role::User,
793 content: vec![ContentPart::Text {
794 text: message.to_string(),
795 }],
796 });
797
798 if self.title.is_none() {
800 self.generate_title().await?;
801 }
802
803 let model = if !model_id.is_empty() {
805 model_id
806 } else {
807 Self::default_model_for_provider(selected_provider)
808 };
809
810 {
812 let ctx_window = context_window_for_model(&model);
813 let msg_tokens = RlmChunker::estimate_tokens(message);
814 let threshold = (ctx_window as f64 * 0.35) as usize;
815 if msg_tokens > threshold {
816 tracing::info!(
817 msg_tokens,
818 threshold,
819 ctx_window,
820 "RLM: User message exceeds context threshold, compressing"
821 );
822 let auto_ctx = AutoProcessContext {
823 tool_id: "session_context",
824 tool_args: serde_json::json!({}),
825 session_id: &self.id,
826 abort: None,
827 on_progress: None,
828 provider: Arc::clone(&provider),
829 model: model.clone(),
830 };
831 let rlm_config = RlmConfig::default();
832 match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
833 Ok(result) => {
834 tracing::info!(
835 input_tokens = result.stats.input_tokens,
836 output_tokens = result.stats.output_tokens,
837 "RLM: User message compressed"
838 );
839 if let Some(last) = self.messages.last_mut() {
840 last.content = vec![ContentPart::Text {
841 text: format!(
842 "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
843 msg_tokens,
844 result.processed,
845 message.chars().take(500).collect::<String>()
846 ),
847 }];
848 }
849 }
850 Err(e) => {
851 tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
852 let max_chars = threshold * 4;
853 let truncated = RlmChunker::compress(message, max_chars / 4, None);
854 if let Some(last) = self.messages.last_mut() {
855 last.content = vec![ContentPart::Text {
856 text: truncated,
857 }];
858 }
859 }
860 }
861 }
862 }
863
864 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
866 let tool_definitions: Vec<_> = tool_registry
867 .definitions()
868 .into_iter()
869 .filter(|tool| !is_interactive_tool(&tool.name))
870 .collect();
871
872 let temperature = if prefers_temperature_one(&model) {
873 Some(1.0)
874 } else {
875 Some(0.7)
876 };
877
878 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
879 tracing::info!("Available tools: {}", tool_definitions.len());
880
881 #[cfg(feature = "functiongemma")]
884 let model_supports_tools = true;
885
886 let cwd = std::env::var("PWD")
888 .map(std::path::PathBuf::from)
889 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
890 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
891
892 let mut final_output = String::new();
893 let max_steps = 50;
894
895 #[cfg(feature = "functiongemma")]
897 let tool_router: Option<ToolCallRouter> = {
898 let cfg = ToolRouterConfig::from_env();
899 match ToolCallRouter::from_config(&cfg) {
900 Ok(r) => r,
901 Err(e) => {
902 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
903 None
904 }
905 }
906 };
907
908 for step in 1..=max_steps {
909 tracing::info!(step = step, "Agent step starting");
910 let _ = event_tx.send(SessionEvent::Thinking).await;
911
912 let mut messages = vec![Message {
914 role: Role::System,
915 content: vec![ContentPart::Text {
916 text: system_prompt.clone(),
917 }],
918 }];
919 messages.extend(self.messages.clone());
920
921 let request = CompletionRequest {
922 messages,
923 tools: tool_definitions.clone(),
924 model: model.clone(),
925 temperature,
926 top_p: None,
927 max_tokens: Some(8192),
928 stop: Vec::new(),
929 };
930
931 let llm_start = std::time::Instant::now();
932 let response = provider.complete(request).await?;
933 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
934
935 #[cfg(feature = "functiongemma")]
938 let response = if let Some(ref router) = tool_router {
939 router
940 .maybe_reformat(response, &tool_definitions, model_supports_tools)
941 .await
942 } else {
943 response
944 };
945
946 crate::telemetry::TOKEN_USAGE.record_model_usage(
947 &model,
948 response.usage.prompt_tokens as u64,
949 response.usage.completion_tokens as u64,
950 );
951
952 let _ = event_tx
954 .send(SessionEvent::UsageReport {
955 prompt_tokens: response.usage.prompt_tokens,
956 completion_tokens: response.usage.completion_tokens,
957 duration_ms: llm_duration_ms,
958 model: model.clone(),
959 })
960 .await;
961
962 let tool_calls: Vec<(String, String, serde_json::Value)> = response
964 .message
965 .content
966 .iter()
967 .filter_map(|part| {
968 if let ContentPart::ToolCall {
969 id,
970 name,
971 arguments,
972 } = part
973 {
974 let args: serde_json::Value =
975 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
976 Some((id.clone(), name.clone(), args))
977 } else {
978 None
979 }
980 })
981 .collect();
982
983 let mut thinking_text = String::new();
986 let mut step_text = String::new();
987 for part in &response.message.content {
988 match part {
989 ContentPart::Thinking { text } => {
990 if !text.is_empty() {
991 thinking_text.push_str(text);
992 thinking_text.push('\n');
993 }
994 }
995 ContentPart::Text { text } => {
996 if !text.is_empty() {
997 step_text.push_str(text);
998 step_text.push('\n');
999 }
1000 }
1001 _ => {}
1002 }
1003 }
1004
1005 if !thinking_text.trim().is_empty() {
1007 let _ = event_tx
1008 .send(SessionEvent::ThinkingComplete(
1009 thinking_text.trim().to_string(),
1010 ))
1011 .await;
1012 }
1013
1014 if !step_text.trim().is_empty() {
1017 let trimmed = step_text.trim().to_string();
1018 let _ = event_tx
1019 .send(SessionEvent::TextChunk(trimmed.clone()))
1020 .await;
1021 let _ = event_tx.send(SessionEvent::TextComplete(trimmed)).await;
1022 final_output.push_str(&step_text);
1023 }
1024
1025 if tool_calls.is_empty() {
1026 self.add_message(response.message.clone());
1027 break;
1028 }
1029
1030 self.add_message(response.message.clone());
1031
1032 tracing::info!(
1033 step = step,
1034 num_tools = tool_calls.len(),
1035 "Executing tool calls"
1036 );
1037
1038 for (tool_id, tool_name, tool_input) in tool_calls {
1040 let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
1041 let _ = event_tx
1042 .send(SessionEvent::ToolCallStart {
1043 name: tool_name.clone(),
1044 arguments: args_str,
1045 })
1046 .await;
1047
1048 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
1049
1050 if is_interactive_tool(&tool_name) {
1051 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
1052 let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
1053 let _ = event_tx
1054 .send(SessionEvent::ToolCallComplete {
1055 name: tool_name.clone(),
1056 output: content.clone(),
1057 success: false,
1058 })
1059 .await;
1060 self.add_message(Message {
1061 role: Role::Tool,
1062 content: vec![ContentPart::ToolResult {
1063 tool_call_id: tool_id,
1064 content,
1065 }],
1066 });
1067 continue;
1068 }
1069
1070 let exec_start = std::time::Instant::now();
1071 let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
1072 match tool.execute(tool_input.clone()).await {
1073 Ok(result) => {
1074 let duration_ms = exec_start.elapsed().as_millis() as u64;
1075 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
1076 if let Some(audit) = try_audit_log() {
1077 audit.log_with_correlation(
1078 AuditCategory::ToolExecution,
1079 format!("tool:{}", tool_name),
1080 if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
1081 None,
1082 Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
1083 None, None, None, Some(self.id.clone()), ).await;
1088 }
1089 (result.output, result.success)
1090 }
1091 Err(e) => {
1092 let duration_ms = exec_start.elapsed().as_millis() as u64;
1093 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
1094 if let Some(audit) = try_audit_log() {
1095 audit.log_with_correlation(
1096 AuditCategory::ToolExecution,
1097 format!("tool:{}", tool_name),
1098 AuditOutcome::Failure,
1099 None,
1100 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
1101 None, None, None, Some(self.id.clone()), ).await;
1106 }
1107 (format!("Error: {}", e), false)
1108 }
1109 }
1110 } else {
1111 tracing::warn!(tool = %tool_name, "Tool not found");
1112 if let Some(audit) = try_audit_log() {
1113 audit
1114 .log_with_correlation(
1115 AuditCategory::ToolExecution,
1116 format!("tool:{}", tool_name),
1117 AuditOutcome::Failure,
1118 None,
1119 Some(json!({ "error": "unknown_tool" })),
1120 None, None, None, Some(self.id.clone()), )
1125 .await;
1126 }
1127 (format!("Error: Unknown tool '{}'", tool_name), false)
1128 };
1129
1130 let duration_ms = exec_start.elapsed().as_millis() as u64;
1132
1133 if let Some(base_dir) = Self::event_stream_path() {
1137 let workspace = std::env::var("PWD")
1138 .map(PathBuf::from)
1139 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
1140 let event = ChatEvent::tool_result(
1141 workspace,
1142 self.id.clone(),
1143 &tool_name,
1144 success,
1145 duration_ms,
1146 &content,
1147 self.messages.len() as u64,
1148 );
1149 let event_json = event.to_json();
1150 let event_size = event_json.len() as u64 + 1; let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
1156 let seq = self.messages.len() as u64;
1157 let filename = format!(
1158 "{}-chat-events-{:020}-{:020}.jsonl",
1159 timestamp,
1160 seq * 10000, (seq + 1) * 10000 );
1163 let event_path = base_dir.join(&self.id).join(filename);
1164
1165 let event_path_clone = event_path;
1167 tokio::spawn(async move {
1168 if let Some(parent) = event_path_clone.parent() {
1169 let _ = tokio::fs::create_dir_all(parent).await;
1170 }
1171 if let Ok(mut file) = tokio::fs::OpenOptions::new()
1172 .create(true)
1173 .append(true)
1174 .open(&event_path_clone)
1175 .await
1176 {
1177 use tokio::io::AsyncWriteExt;
1178 let _ = file.write_all(event_json.as_bytes()).await;
1179 let _ = file.write_all(b"\n").await;
1180 tracing::debug!(path = %event_path_clone.display(), size = event_size, "Event stream wrote");
1181 }
1182 });
1183 }
1184
1185 let _ = event_tx
1186 .send(SessionEvent::ToolCallComplete {
1187 name: tool_name.clone(),
1188 output: content.clone(),
1189 success,
1190 })
1191 .await;
1192
1193 let content = {
1195 let ctx_window = context_window_for_model(&model);
1196 let total_chars: usize = self.messages.iter().map(|m| m.content.iter().map(|p| match p {
1197 ContentPart::Text { text } => text.len(),
1198 ContentPart::ToolResult { content, .. } => content.len(),
1199 _ => 0,
1200 }).sum::<usize>()).sum();
1201 let current_tokens = total_chars / 4;
1202 let routing_ctx = RoutingContext {
1203 tool_id: tool_name.clone(),
1204 session_id: self.id.clone(),
1205 call_id: Some(tool_id.clone()),
1206 model_context_limit: ctx_window,
1207 current_context_tokens: Some(current_tokens),
1208 };
1209 let rlm_config = RlmConfig::default();
1210 let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
1211 if routing.should_route {
1212 tracing::info!(
1213 tool = %tool_name,
1214 reason = %routing.reason,
1215 estimated_tokens = routing.estimated_tokens,
1216 "RLM: Routing large tool output"
1217 );
1218 let auto_ctx = AutoProcessContext {
1219 tool_id: &tool_name,
1220 tool_args: tool_input.clone(),
1221 session_id: &self.id,
1222 abort: None,
1223 on_progress: None,
1224 provider: Arc::clone(&provider),
1225 model: model.clone(),
1226 };
1227 match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
1228 Ok(result) => {
1229 tracing::info!(
1230 input_tokens = result.stats.input_tokens,
1231 output_tokens = result.stats.output_tokens,
1232 iterations = result.stats.iterations,
1233 "RLM: Processing complete"
1234 );
1235 result.processed
1236 }
1237 Err(e) => {
1238 tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1239 let (truncated, _, _) = RlmRouter::smart_truncate(
1240 &content, &tool_name, &tool_input, ctx_window / 4,
1241 );
1242 truncated
1243 }
1244 }
1245 } else {
1246 content
1247 }
1248 };
1249
1250 self.add_message(Message {
1251 role: Role::Tool,
1252 content: vec![ContentPart::ToolResult {
1253 tool_call_id: tool_id,
1254 content,
1255 }],
1256 });
1257 }
1258 }
1259
1260 self.save().await?;
1261
1262 self.archive_event_stream_to_s3().await;
1264
1265 let _ = event_tx.send(SessionEvent::SessionSync(self.clone())).await;
1268 let _ = event_tx.send(SessionEvent::Done).await;
1269
1270 Ok(SessionResult {
1271 text: final_output.trim().to_string(),
1272 session_id: self.id.clone(),
1273 })
1274 }
1275
1276 pub async fn generate_title(&mut self) -> Result<()> {
1279 if self.title.is_some() {
1280 return Ok(());
1281 }
1282
1283 let first_message = self
1285 .messages
1286 .iter()
1287 .find(|m| m.role == crate::provider::Role::User);
1288
1289 if let Some(msg) = first_message {
1290 let text: String = msg
1291 .content
1292 .iter()
1293 .filter_map(|p| match p {
1294 crate::provider::ContentPart::Text { text } => Some(text.clone()),
1295 _ => None,
1296 })
1297 .collect::<Vec<_>>()
1298 .join(" ");
1299
1300 self.title = Some(truncate_with_ellipsis(&text, 47));
1302 }
1303
1304 Ok(())
1305 }
1306
1307 pub async fn regenerate_title(&mut self) -> Result<()> {
1310 let first_message = self
1312 .messages
1313 .iter()
1314 .find(|m| m.role == crate::provider::Role::User);
1315
1316 if let Some(msg) = first_message {
1317 let text: String = msg
1318 .content
1319 .iter()
1320 .filter_map(|p| match p {
1321 crate::provider::ContentPart::Text { text } => Some(text.clone()),
1322 _ => None,
1323 })
1324 .collect::<Vec<_>>()
1325 .join(" ");
1326
1327 self.title = Some(truncate_with_ellipsis(&text, 47));
1329 }
1330
1331 Ok(())
1332 }
1333
1334 pub fn set_title(&mut self, title: impl Into<String>) {
1336 self.title = Some(title.into());
1337 self.updated_at = Utc::now();
1338 }
1339
1340 pub fn clear_title(&mut self) {
1342 self.title = None;
1343 self.updated_at = Utc::now();
1344 }
1345
1346 pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
1349 self.updated_at = Utc::now();
1350
1351 if regenerate_title {
1352 self.regenerate_title().await?;
1353 }
1354
1355 Ok(())
1356 }
1357
1358 pub async fn from_opencode(
1363 session_id: &str,
1364 storage: &crate::opencode::OpenCodeStorage,
1365 ) -> Result<Self> {
1366 let oc_session = storage.load_session(session_id).await?;
1367 let oc_messages = storage.load_messages(session_id).await?;
1368
1369 let mut messages_with_parts = Vec::new();
1370 for msg in oc_messages {
1371 let parts = storage.load_parts(&msg.id).await?;
1372 messages_with_parts.push((msg, parts));
1373 }
1374
1375 crate::opencode::convert::to_codetether_session(&oc_session, messages_with_parts).await
1376 }
1377
1378 pub async fn last_opencode_for_directory(dir: &std::path::Path) -> Result<Self> {
1380 let storage = crate::opencode::OpenCodeStorage::new()
1381 .ok_or_else(|| anyhow::anyhow!("OpenCode storage directory not found"))?;
1382
1383 if !storage.exists() {
1384 anyhow::bail!("OpenCode storage does not exist");
1385 }
1386
1387 let oc_session = storage.last_session_for_directory(dir).await?;
1388 Self::from_opencode(&oc_session.id, &storage).await
1389 }
1390
1391 pub async fn delete(id: &str) -> Result<()> {
1393 let path = Self::session_path(id)?;
1394 if path.exists() {
1395 tokio::fs::remove_file(&path).await?;
1396 }
1397 Ok(())
1398 }
1399
1400 fn sessions_dir() -> Result<PathBuf> {
1402 crate::config::Config::data_dir()
1403 .map(|d| d.join("sessions"))
1404 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
1405 }
1406
1407 fn session_path(id: &str) -> Result<PathBuf> {
1409 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
1410 }
1411
1412 fn event_stream_path() -> Option<PathBuf> {
1415 std::env::var("CODETETHER_EVENT_STREAM_PATH")
1416 .ok()
1417 .map(PathBuf::from)
1418 }
1419}
1420
1421#[derive(Debug, Clone, Serialize, Deserialize)]
1423pub struct SessionResult {
1424 pub text: String,
1425 pub session_id: String,
1426}
1427
1428#[derive(Debug, Clone)]
1430pub enum SessionEvent {
1431 Thinking,
1433 ToolCallStart { name: String, arguments: String },
1435 ToolCallComplete {
1437 name: String,
1438 output: String,
1439 success: bool,
1440 },
1441 TextChunk(String),
1443 TextComplete(String),
1445 ThinkingComplete(String),
1447 UsageReport {
1449 prompt_tokens: usize,
1450 completion_tokens: usize,
1451 duration_ms: u64,
1452 model: String,
1453 },
1454 SessionSync(Session),
1456 Done,
1458 Error(String),
1460}
1461
1462pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
1464 let sessions_dir = crate::config::Config::data_dir()
1465 .map(|d| d.join("sessions"))
1466 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
1467
1468 if !sessions_dir.exists() {
1469 return Ok(Vec::new());
1470 }
1471
1472 let mut summaries = Vec::new();
1473 let mut entries = fs::read_dir(&sessions_dir).await?;
1474
1475 while let Some(entry) = entries.next_entry().await? {
1476 let path = entry.path();
1477 if path.extension().map(|e| e == "json").unwrap_or(false) {
1478 if let Ok(content) = fs::read_to_string(&path).await {
1479 if let Ok(session) = serde_json::from_str::<Session>(&content) {
1480 summaries.push(SessionSummary {
1481 id: session.id,
1482 title: session.title,
1483 created_at: session.created_at,
1484 updated_at: session.updated_at,
1485 message_count: session.messages.len(),
1486 agent: session.agent,
1487 directory: session.metadata.directory,
1488 });
1489 }
1490 }
1491 }
1492 }
1493
1494 summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1495 Ok(summaries)
1496}
1497
1498pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1503 let all = list_sessions().await?;
1504 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
1505 Ok(all
1506 .into_iter()
1507 .filter(|s| {
1508 s.directory
1509 .as_ref()
1510 .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
1511 .unwrap_or(false)
1512 })
1513 .collect())
1514}
1515
1516pub async fn list_sessions_with_opencode(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1522 let mut sessions = list_sessions_for_directory(dir).await?;
1523
1524 if let Some(storage) = crate::opencode::OpenCodeStorage::new() {
1526 if storage.exists() {
1527 if let Ok(oc_sessions) = storage.list_sessions_for_directory(dir).await {
1528 for oc in oc_sessions {
1529 let import_id = format!("opencode_{}", oc.id);
1531 if sessions.iter().any(|s| s.id == import_id) {
1532 continue;
1533 }
1534
1535 sessions.push(SessionSummary {
1536 id: import_id,
1537 title: Some(format!("[opencode] {}", oc.title)),
1538 created_at: oc.created_at,
1539 updated_at: oc.updated_at,
1540 message_count: oc.message_count,
1541 agent: "build".to_string(),
1542 directory: Some(PathBuf::from(&oc.directory)),
1543 });
1544 }
1545 }
1546 }
1547 }
1548
1549 sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1551 Ok(sessions)
1552}
1553
1554pub async fn list_sessions_with_opencode_paged(
1562 dir: &std::path::Path,
1563 limit: usize,
1564 offset: usize,
1565) -> Result<Vec<SessionSummary>> {
1566 let mut sessions = list_sessions_for_directory(dir).await?;
1567
1568 if let Some(storage) = crate::opencode::OpenCodeStorage::new() {
1570 if storage.exists() {
1571 if let Ok(oc_sessions) = storage.list_sessions_for_directory(dir).await {
1572 for oc in oc_sessions {
1573 let import_id = format!("opencode_{}", oc.id);
1575 if sessions.iter().any(|s| s.id == import_id) {
1576 continue;
1577 }
1578
1579 sessions.push(SessionSummary {
1580 id: import_id,
1581 title: Some(format!("[opencode] {}", oc.title)),
1582 created_at: oc.created_at,
1583 updated_at: oc.updated_at,
1584 message_count: oc.message_count,
1585 agent: "build".to_string(),
1586 directory: Some(PathBuf::from(&oc.directory)),
1587 });
1588 }
1589 }
1590 }
1591 }
1592
1593 sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1595
1596 Ok(sessions.into_iter().skip(offset).take(limit).collect())
1598}
1599
1600#[derive(Debug, Clone, Serialize, Deserialize)]
1602pub struct SessionSummary {
1603 pub id: String,
1604 pub title: Option<String>,
1605 pub created_at: DateTime<Utc>,
1606 pub updated_at: DateTime<Utc>,
1607 pub message_count: usize,
1608 pub agent: String,
1609 #[serde(default)]
1611 pub directory: Option<PathBuf>,
1612}
1613
1614fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
1615 if max_chars == 0 {
1616 return String::new();
1617 }
1618
1619 let mut chars = value.chars();
1620 let mut output = String::new();
1621 for _ in 0..max_chars {
1622 if let Some(ch) = chars.next() {
1623 output.push(ch);
1624 } else {
1625 return value.to_string();
1626 }
1627 }
1628
1629 if chars.next().is_some() {
1630 format!("{output}...")
1631 } else {
1632 output
1633 }
1634}
1635
1636#[allow(dead_code)]
1638use futures::StreamExt;
1639
1640#[allow(dead_code)]
1641trait AsyncCollect<T> {
1642 async fn collect(self) -> Vec<T>;
1643}
1644
1645#[allow(dead_code)]
1646impl<S, T> AsyncCollect<T> for S
1647where
1648 S: futures::Stream<Item = T> + Unpin,
1649{
1650 async fn collect(mut self) -> Vec<T> {
1651 let mut items = Vec::new();
1652 while let Some(item) = self.next().await {
1653 items.push(item);
1654 }
1655 items
1656 }
1657}