1use crate::agent::ToolUse;
6use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
7use crate::event_stream::ChatEvent;
8use crate::event_stream::s3_sink::S3Sink;
9use crate::provider::{Message, Usage};
10use crate::tool::ToolRegistry;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use std::path::PathBuf;
16use std::sync::Arc;
17use tokio::fs;
18use uuid::Uuid;
19
20#[cfg(feature = "functiongemma")]
21use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
22
23fn is_interactive_tool(tool_name: &str) -> bool {
24 matches!(tool_name, "question")
25}
26
27fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
28 let preferred = [
31 "zai",
32 "openai",
33 "github-copilot",
34 "anthropic",
35 "minimax",
36 "openrouter",
37 "novita",
38 "moonshotai",
39 "google",
40 ];
41 for name in preferred {
42 if let Some(found) = providers.iter().copied().find(|p| *p == name) {
43 return Some(found);
44 }
45 }
46 providers.first().copied()
47}
48
49fn prefers_temperature_one(model: &str) -> bool {
50 let normalized = model.to_ascii_lowercase();
51 normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct Session {
57 pub id: String,
58 pub title: Option<String>,
59 pub created_at: DateTime<Utc>,
60 pub updated_at: DateTime<Utc>,
61 pub messages: Vec<Message>,
62 pub tool_uses: Vec<ToolUse>,
63 pub usage: Usage,
64 pub agent: String,
65 pub metadata: SessionMetadata,
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct SessionMetadata {
70 pub directory: Option<PathBuf>,
71 pub model: Option<String>,
72 pub shared: bool,
73 pub share_url: Option<String>,
74}
75
76impl Session {
77 fn default_model_for_provider(provider: &str) -> String {
78 match provider {
79 "moonshotai" => "kimi-k2.5".to_string(),
80 "anthropic" => "claude-sonnet-4-20250514".to_string(),
81 "minimax" => "MiniMax-M2.5".to_string(),
82 "openai" => "gpt-4o".to_string(),
83 "google" => "gemini-2.5-pro".to_string(),
84 "zhipuai" | "zai" => "glm-5".to_string(),
85 "openrouter" => "z-ai/glm-5".to_string(),
87 "novita" => "qwen/qwen3-coder-next".to_string(),
88 "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
89 _ => "glm-5".to_string(),
90 }
91 }
92
93 pub async fn new() -> Result<Self> {
95 let id = Uuid::new_v4().to_string();
96 let now = Utc::now();
97
98 Ok(Self {
99 id,
100 title: None,
101 created_at: now,
102 updated_at: now,
103 messages: Vec::new(),
104 tool_uses: Vec::new(),
105 usage: Usage::default(),
106 agent: "build".to_string(),
107 metadata: SessionMetadata {
108 directory: Some(std::env::current_dir()?),
109 ..Default::default()
110 },
111 })
112 }
113
114 pub async fn load(id: &str) -> Result<Self> {
116 let path = Self::session_path(id)?;
117 let content = fs::read_to_string(&path).await?;
118 let session: Session = serde_json::from_str(&content)?;
119 Ok(session)
120 }
121
122 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
127 let sessions_dir = Self::sessions_dir()?;
128
129 if !sessions_dir.exists() {
130 anyhow::bail!("No sessions found");
131 }
132
133 let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
134 let mut read_dir = fs::read_dir(&sessions_dir).await?;
135 while let Some(entry) = read_dir.next_entry().await? {
136 entries.push(entry);
137 }
138
139 if entries.is_empty() {
140 anyhow::bail!("No sessions found");
141 }
142
143 entries.sort_by_key(|e| {
146 std::cmp::Reverse(
147 std::fs::metadata(e.path())
148 .ok()
149 .and_then(|m| m.modified().ok())
150 .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
151 )
152 });
153
154 let canonical_workspace =
155 workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
156
157 for entry in &entries {
158 let content: String = fs::read_to_string(entry.path()).await?;
159 if let Ok(session) = serde_json::from_str::<Session>(&content) {
160 if let Some(ref ws) = canonical_workspace {
162 if let Some(ref dir) = session.metadata.directory {
163 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
164 if &canonical_dir == ws {
165 return Ok(session);
166 }
167 }
168 continue;
169 }
170 return Ok(session);
171 }
172 }
173
174 anyhow::bail!("No sessions found")
175 }
176
177 pub async fn last() -> Result<Self> {
179 Self::last_for_directory(None).await
180 }
181
182 pub async fn save(&self) -> Result<()> {
184 let path = Self::session_path(&self.id)?;
185
186 if let Some(parent) = path.parent() {
187 fs::create_dir_all(parent).await?;
188 }
189
190 let content = serde_json::to_string_pretty(self)?;
191 fs::write(&path, content).await?;
192
193 Ok(())
194 }
195
196 pub fn add_message(&mut self, message: Message) {
198 self.messages.push(message);
199 self.updated_at = Utc::now();
200 }
201
202 pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
204 use crate::provider::{
205 CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
206 };
207
208 let registry = ProviderRegistry::from_vault().await?;
210
211 let providers = registry.list();
212 if providers.is_empty() {
213 anyhow::bail!(
214 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
215 );
216 }
217
218 tracing::info!("Available providers: {:?}", providers);
219
220 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
222 let (prov, model) = parse_model_string(model_str);
223 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
224 if prov.is_some() {
225 (prov.map(|s| s.to_string()), model.to_string())
227 } else if providers.contains(&model) {
228 (Some(model.to_string()), String::new())
230 } else {
231 (None, model.to_string())
233 }
234 } else {
235 (None, String::new())
236 };
237
238 let selected_provider = provider_name
240 .as_deref()
241 .filter(|p| providers.contains(p))
242 .or_else(|| choose_default_provider(providers.as_slice()))
243 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
244
245 let provider = registry
246 .get(selected_provider)
247 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
248
249 self.add_message(Message {
251 role: Role::User,
252 content: vec![ContentPart::Text {
253 text: message.to_string(),
254 }],
255 });
256
257 if self.title.is_none() {
259 self.generate_title().await?;
260 }
261
262 let model = if !model_id.is_empty() {
264 model_id
265 } else {
266 Self::default_model_for_provider(selected_provider)
267 };
268
269 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
271 let tool_definitions: Vec<_> = tool_registry
272 .definitions()
273 .into_iter()
274 .filter(|tool| !is_interactive_tool(&tool.name))
275 .collect();
276
277 let temperature = if prefers_temperature_one(&model) {
282 Some(1.0)
283 } else {
284 Some(0.7)
285 };
286
287 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
288 tracing::info!("Available tools: {}", tool_definitions.len());
289
290 #[cfg(feature = "functiongemma")]
293 let model_supports_tools = true;
294
295 let cwd = self
297 .metadata
298 .directory
299 .clone()
300 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
301 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
302
303 let max_steps = 50;
305 let mut final_output = String::new();
306
307 #[cfg(feature = "functiongemma")]
309 let tool_router: Option<ToolCallRouter> = {
310 let cfg = ToolRouterConfig::from_env();
311 match ToolCallRouter::from_config(&cfg) {
312 Ok(r) => r,
313 Err(e) => {
314 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
315 None
316 }
317 }
318 };
319
320 for step in 1..=max_steps {
321 tracing::info!(step = step, "Agent step starting");
322
323 let mut messages = vec![Message {
325 role: Role::System,
326 content: vec![ContentPart::Text {
327 text: system_prompt.clone(),
328 }],
329 }];
330 messages.extend(self.messages.clone());
331
332 let request = CompletionRequest {
334 messages,
335 tools: tool_definitions.clone(),
336 model: model.clone(),
337 temperature,
338 top_p: None,
339 max_tokens: Some(8192),
340 stop: Vec::new(),
341 };
342
343 let response = provider.complete(request).await?;
345
346 #[cfg(feature = "functiongemma")]
350 let response = if let Some(ref router) = tool_router {
351 router
352 .maybe_reformat(response, &tool_definitions, model_supports_tools)
353 .await
354 } else {
355 response
356 };
357
358 crate::telemetry::TOKEN_USAGE.record_model_usage(
360 &model,
361 response.usage.prompt_tokens as u64,
362 response.usage.completion_tokens as u64,
363 );
364
365 let tool_calls: Vec<(String, String, serde_json::Value)> = response
367 .message
368 .content
369 .iter()
370 .filter_map(|part| {
371 if let ContentPart::ToolCall {
372 id,
373 name,
374 arguments,
375 } = part
376 {
377 let args: serde_json::Value =
379 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
380 Some((id.clone(), name.clone(), args))
381 } else {
382 None
383 }
384 })
385 .collect();
386
387 for part in &response.message.content {
389 if let ContentPart::Text { text } = part {
390 if !text.is_empty() {
391 final_output.push_str(text);
392 final_output.push('\n');
393 }
394 }
395 }
396
397 if tool_calls.is_empty() {
399 self.add_message(response.message.clone());
400 break;
401 }
402
403 self.add_message(response.message.clone());
405
406 tracing::info!(
407 step = step,
408 num_tools = tool_calls.len(),
409 "Executing tool calls"
410 );
411
412 for (tool_id, tool_name, tool_input) in tool_calls {
414 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
415
416 if is_interactive_tool(&tool_name) {
417 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
418 self.add_message(Message {
419 role: Role::Tool,
420 content: vec![ContentPart::ToolResult {
421 tool_call_id: tool_id,
422 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
423 }],
424 });
425 continue;
426 }
427
428 let exec_start = std::time::Instant::now();
430 let content = if let Some(tool) = tool_registry.get(&tool_name) {
431 match tool.execute(tool_input.clone()).await {
432 Ok(result) => {
433 let duration_ms = exec_start.elapsed().as_millis() as u64;
434 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
435 if let Some(audit) = try_audit_log() {
436 audit.log(
437 AuditCategory::ToolExecution,
438 format!("tool:{}", tool_name),
439 if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
440 None,
441 Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
442 ).await;
443 }
444 result.output
445 }
446 Err(e) => {
447 let duration_ms = exec_start.elapsed().as_millis() as u64;
448 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
449 if let Some(audit) = try_audit_log() {
450 audit.log(
451 AuditCategory::ToolExecution,
452 format!("tool:{}", tool_name),
453 AuditOutcome::Failure,
454 None,
455 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
456 ).await;
457 }
458 format!("Error: {}", e)
459 }
460 }
461 } else {
462 tracing::warn!(tool = %tool_name, "Tool not found");
463 if let Some(audit) = try_audit_log() {
464 audit
465 .log(
466 AuditCategory::ToolExecution,
467 format!("tool:{}", tool_name),
468 AuditOutcome::Failure,
469 None,
470 Some(json!({ "error": "unknown_tool" })),
471 )
472 .await;
473 }
474 format!("Error: Unknown tool '{}'", tool_name)
475 };
476
477 let duration_ms = exec_start.elapsed().as_millis() as u64;
479 let success = !content.starts_with("Error:");
480
481 if let Some(base_dir) = Self::event_stream_path() {
483 let workspace = std::env::var("PWD")
484 .map(PathBuf::from)
485 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
486 let event = ChatEvent::tool_result(
487 workspace,
488 self.id.clone(),
489 &tool_name,
490 success,
491 duration_ms,
492 &content,
493 self.messages.len() as u64,
494 );
495 let event_json = event.to_json();
496 let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
497 let seq = self.messages.len() as u64;
498 let filename = format!(
499 "{}-chat-events-{:020}-{:020}.jsonl",
500 timestamp,
501 seq * 10000,
502 (seq + 1) * 10000
503 );
504 let event_path = base_dir.join(&self.id).join(filename);
505
506 let event_path_clone = event_path;
507 tokio::spawn(async move {
508 if let Some(parent) = event_path_clone.parent() {
509 let _ = tokio::fs::create_dir_all(parent).await;
510 }
511 if let Ok(mut file) = tokio::fs::OpenOptions::new()
512 .create(true)
513 .append(true)
514 .open(&event_path_clone)
515 .await
516 {
517 use tokio::io::AsyncWriteExt;
518 let _ = file.write_all(event_json.as_bytes()).await;
519 let _ = file.write_all(b"\n").await;
520 }
521 });
522 }
523
524 self.add_message(Message {
526 role: Role::Tool,
527 content: vec![ContentPart::ToolResult {
528 tool_call_id: tool_id,
529 content,
530 }],
531 });
532 }
533 }
534
535 self.save().await?;
537
538 self.archive_event_stream_to_s3().await;
540
541 Ok(SessionResult {
542 text: final_output.trim().to_string(),
543 session_id: self.id.clone(),
544 })
545 }
546
547 async fn archive_event_stream_to_s3(&self) {
549 if !S3Sink::is_configured() {
551 return;
552 }
553
554 let Some(base_dir) = Self::event_stream_path() else {
555 return;
556 };
557
558 let session_event_dir = base_dir.join(&self.id);
559 if !session_event_dir.exists() {
560 return;
561 }
562
563 let Ok(sink) = S3Sink::from_env().await else {
565 tracing::warn!("Failed to create S3 sink for archival");
566 return;
567 };
568
569 let session_id = self.id.clone();
571 tokio::spawn(async move {
572 if let Ok(mut entries) = tokio::fs::read_dir(&session_event_dir).await {
573 while let Ok(Some(entry)) = entries.next_entry().await {
574 let path = entry.path();
575 if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
576 match sink.upload_file(&path, &session_id).await {
577 Ok(url) => {
578 tracing::info!(url = %url, "Archived event stream to S3/R2");
579 }
580 Err(e) => {
581 tracing::warn!(error = %e, "Failed to archive event file to S3");
582 }
583 }
584 }
585 }
586 }
587 });
588 }
589
590 pub async fn prompt_with_events(
597 &mut self,
598 message: &str,
599 event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
600 registry: std::sync::Arc<crate::provider::ProviderRegistry>,
601 ) -> Result<SessionResult> {
602 use crate::provider::{CompletionRequest, ContentPart, Role, parse_model_string};
603
604 let _ = event_tx.send(SessionEvent::Thinking).await;
605
606 let providers = registry.list();
607 if providers.is_empty() {
608 anyhow::bail!(
609 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
610 );
611 }
612 tracing::info!("Available providers: {:?}", providers);
613
614 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
616 let (prov, model) = parse_model_string(model_str);
617 let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
618 if prov.is_some() {
619 (prov.map(|s| s.to_string()), model.to_string())
620 } else if providers.contains(&model) {
621 (Some(model.to_string()), String::new())
622 } else {
623 (None, model.to_string())
624 }
625 } else {
626 (None, String::new())
627 };
628
629 let selected_provider = provider_name
631 .as_deref()
632 .filter(|p| providers.contains(p))
633 .or_else(|| choose_default_provider(providers.as_slice()))
634 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
635
636 let provider = registry
637 .get(selected_provider)
638 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
639
640 self.add_message(Message {
642 role: Role::User,
643 content: vec![ContentPart::Text {
644 text: message.to_string(),
645 }],
646 });
647
648 if self.title.is_none() {
650 self.generate_title().await?;
651 }
652
653 let model = if !model_id.is_empty() {
655 model_id
656 } else {
657 Self::default_model_for_provider(selected_provider)
658 };
659
660 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
662 let tool_definitions: Vec<_> = tool_registry
663 .definitions()
664 .into_iter()
665 .filter(|tool| !is_interactive_tool(&tool.name))
666 .collect();
667
668 let temperature = if prefers_temperature_one(&model) {
669 Some(1.0)
670 } else {
671 Some(0.7)
672 };
673
674 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
675 tracing::info!("Available tools: {}", tool_definitions.len());
676
677 #[cfg(feature = "functiongemma")]
680 let model_supports_tools = true;
681
682 let cwd = std::env::var("PWD")
684 .map(std::path::PathBuf::from)
685 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
686 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
687
688 let mut final_output = String::new();
689 let max_steps = 50;
690
691 #[cfg(feature = "functiongemma")]
693 let tool_router: Option<ToolCallRouter> = {
694 let cfg = ToolRouterConfig::from_env();
695 match ToolCallRouter::from_config(&cfg) {
696 Ok(r) => r,
697 Err(e) => {
698 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
699 None
700 }
701 }
702 };
703
704 for step in 1..=max_steps {
705 tracing::info!(step = step, "Agent step starting");
706 let _ = event_tx.send(SessionEvent::Thinking).await;
707
708 let mut messages = vec![Message {
710 role: Role::System,
711 content: vec![ContentPart::Text {
712 text: system_prompt.clone(),
713 }],
714 }];
715 messages.extend(self.messages.clone());
716
717 let request = CompletionRequest {
718 messages,
719 tools: tool_definitions.clone(),
720 model: model.clone(),
721 temperature,
722 top_p: None,
723 max_tokens: Some(8192),
724 stop: Vec::new(),
725 };
726
727 let llm_start = std::time::Instant::now();
728 let response = provider.complete(request).await?;
729 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
730
731 #[cfg(feature = "functiongemma")]
734 let response = if let Some(ref router) = tool_router {
735 router
736 .maybe_reformat(response, &tool_definitions, model_supports_tools)
737 .await
738 } else {
739 response
740 };
741
742 crate::telemetry::TOKEN_USAGE.record_model_usage(
743 &model,
744 response.usage.prompt_tokens as u64,
745 response.usage.completion_tokens as u64,
746 );
747
748 let _ = event_tx
750 .send(SessionEvent::UsageReport {
751 prompt_tokens: response.usage.prompt_tokens,
752 completion_tokens: response.usage.completion_tokens,
753 duration_ms: llm_duration_ms,
754 model: model.clone(),
755 })
756 .await;
757
758 let tool_calls: Vec<(String, String, serde_json::Value)> = response
760 .message
761 .content
762 .iter()
763 .filter_map(|part| {
764 if let ContentPart::ToolCall {
765 id,
766 name,
767 arguments,
768 } = part
769 {
770 let args: serde_json::Value =
771 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
772 Some((id.clone(), name.clone(), args))
773 } else {
774 None
775 }
776 })
777 .collect();
778
779 let mut thinking_text = String::new();
782 let mut step_text = String::new();
783 for part in &response.message.content {
784 match part {
785 ContentPart::Thinking { text } => {
786 if !text.is_empty() {
787 thinking_text.push_str(text);
788 thinking_text.push('\n');
789 }
790 }
791 ContentPart::Text { text } => {
792 if !text.is_empty() {
793 step_text.push_str(text);
794 step_text.push('\n');
795 }
796 }
797 _ => {}
798 }
799 }
800
801 if !thinking_text.trim().is_empty() {
803 let _ = event_tx
804 .send(SessionEvent::ThinkingComplete(
805 thinking_text.trim().to_string(),
806 ))
807 .await;
808 }
809
810 if !step_text.trim().is_empty() {
813 let trimmed = step_text.trim().to_string();
814 let _ = event_tx
815 .send(SessionEvent::TextChunk(trimmed.clone()))
816 .await;
817 let _ = event_tx.send(SessionEvent::TextComplete(trimmed)).await;
818 final_output.push_str(&step_text);
819 }
820
821 if tool_calls.is_empty() {
822 self.add_message(response.message.clone());
823 break;
824 }
825
826 self.add_message(response.message.clone());
827
828 tracing::info!(
829 step = step,
830 num_tools = tool_calls.len(),
831 "Executing tool calls"
832 );
833
834 for (tool_id, tool_name, tool_input) in tool_calls {
836 let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
837 let _ = event_tx
838 .send(SessionEvent::ToolCallStart {
839 name: tool_name.clone(),
840 arguments: args_str,
841 })
842 .await;
843
844 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
845
846 if is_interactive_tool(&tool_name) {
847 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
848 let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
849 let _ = event_tx
850 .send(SessionEvent::ToolCallComplete {
851 name: tool_name.clone(),
852 output: content.clone(),
853 success: false,
854 })
855 .await;
856 self.add_message(Message {
857 role: Role::Tool,
858 content: vec![ContentPart::ToolResult {
859 tool_call_id: tool_id,
860 content,
861 }],
862 });
863 continue;
864 }
865
866 let exec_start = std::time::Instant::now();
867 let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
868 match tool.execute(tool_input.clone()).await {
869 Ok(result) => {
870 let duration_ms = exec_start.elapsed().as_millis() as u64;
871 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
872 if let Some(audit) = try_audit_log() {
873 audit.log(
874 AuditCategory::ToolExecution,
875 format!("tool:{}", tool_name),
876 if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
877 None,
878 Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
879 ).await;
880 }
881 (result.output, result.success)
882 }
883 Err(e) => {
884 let duration_ms = exec_start.elapsed().as_millis() as u64;
885 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
886 if let Some(audit) = try_audit_log() {
887 audit.log(
888 AuditCategory::ToolExecution,
889 format!("tool:{}", tool_name),
890 AuditOutcome::Failure,
891 None,
892 Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
893 ).await;
894 }
895 (format!("Error: {}", e), false)
896 }
897 }
898 } else {
899 tracing::warn!(tool = %tool_name, "Tool not found");
900 if let Some(audit) = try_audit_log() {
901 audit
902 .log(
903 AuditCategory::ToolExecution,
904 format!("tool:{}", tool_name),
905 AuditOutcome::Failure,
906 None,
907 Some(json!({ "error": "unknown_tool" })),
908 )
909 .await;
910 }
911 (format!("Error: Unknown tool '{}'", tool_name), false)
912 };
913
914 let duration_ms = exec_start.elapsed().as_millis() as u64;
916
917 if let Some(base_dir) = Self::event_stream_path() {
921 let workspace = std::env::var("PWD")
922 .map(PathBuf::from)
923 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
924 let event = ChatEvent::tool_result(
925 workspace,
926 self.id.clone(),
927 &tool_name,
928 success,
929 duration_ms,
930 &content,
931 self.messages.len() as u64,
932 );
933 let event_json = event.to_json();
934 let event_size = event_json.len() as u64 + 1; let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
940 let seq = self.messages.len() as u64;
941 let filename = format!(
942 "{}-chat-events-{:020}-{:020}.jsonl",
943 timestamp,
944 seq * 10000, (seq + 1) * 10000 );
947 let event_path = base_dir.join(&self.id).join(filename);
948
949 let event_path_clone = event_path;
951 tokio::spawn(async move {
952 if let Some(parent) = event_path_clone.parent() {
953 let _ = tokio::fs::create_dir_all(parent).await;
954 }
955 if let Ok(mut file) = tokio::fs::OpenOptions::new()
956 .create(true)
957 .append(true)
958 .open(&event_path_clone)
959 .await
960 {
961 use tokio::io::AsyncWriteExt;
962 let _ = file.write_all(event_json.as_bytes()).await;
963 let _ = file.write_all(b"\n").await;
964 tracing::debug!(path = %event_path_clone.display(), size = event_size, "Event stream wrote");
965 }
966 });
967 }
968
969 let _ = event_tx
970 .send(SessionEvent::ToolCallComplete {
971 name: tool_name.clone(),
972 output: content.clone(),
973 success,
974 })
975 .await;
976
977 self.add_message(Message {
978 role: Role::Tool,
979 content: vec![ContentPart::ToolResult {
980 tool_call_id: tool_id,
981 content,
982 }],
983 });
984 }
985 }
986
987 self.save().await?;
988
989 self.archive_event_stream_to_s3().await;
991
992 let _ = event_tx.send(SessionEvent::SessionSync(self.clone())).await;
995 let _ = event_tx.send(SessionEvent::Done).await;
996
997 Ok(SessionResult {
998 text: final_output.trim().to_string(),
999 session_id: self.id.clone(),
1000 })
1001 }
1002
1003 pub async fn generate_title(&mut self) -> Result<()> {
1006 if self.title.is_some() {
1007 return Ok(());
1008 }
1009
1010 let first_message = self
1012 .messages
1013 .iter()
1014 .find(|m| m.role == crate::provider::Role::User);
1015
1016 if let Some(msg) = first_message {
1017 let text: String = msg
1018 .content
1019 .iter()
1020 .filter_map(|p| match p {
1021 crate::provider::ContentPart::Text { text } => Some(text.clone()),
1022 _ => None,
1023 })
1024 .collect::<Vec<_>>()
1025 .join(" ");
1026
1027 self.title = Some(truncate_with_ellipsis(&text, 47));
1029 }
1030
1031 Ok(())
1032 }
1033
1034 pub async fn regenerate_title(&mut self) -> Result<()> {
1037 let first_message = self
1039 .messages
1040 .iter()
1041 .find(|m| m.role == crate::provider::Role::User);
1042
1043 if let Some(msg) = first_message {
1044 let text: String = msg
1045 .content
1046 .iter()
1047 .filter_map(|p| match p {
1048 crate::provider::ContentPart::Text { text } => Some(text.clone()),
1049 _ => None,
1050 })
1051 .collect::<Vec<_>>()
1052 .join(" ");
1053
1054 self.title = Some(truncate_with_ellipsis(&text, 47));
1056 }
1057
1058 Ok(())
1059 }
1060
1061 pub fn set_title(&mut self, title: impl Into<String>) {
1063 self.title = Some(title.into());
1064 self.updated_at = Utc::now();
1065 }
1066
1067 pub fn clear_title(&mut self) {
1069 self.title = None;
1070 self.updated_at = Utc::now();
1071 }
1072
1073 pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
1076 self.updated_at = Utc::now();
1077
1078 if regenerate_title {
1079 self.regenerate_title().await?;
1080 }
1081
1082 Ok(())
1083 }
1084
1085 pub async fn from_opencode(
1090 session_id: &str,
1091 storage: &crate::opencode::OpenCodeStorage,
1092 ) -> Result<Self> {
1093 let oc_session = storage.load_session(session_id).await?;
1094 let oc_messages = storage.load_messages(session_id).await?;
1095
1096 let mut messages_with_parts = Vec::new();
1097 for msg in oc_messages {
1098 let parts = storage.load_parts(&msg.id).await?;
1099 messages_with_parts.push((msg, parts));
1100 }
1101
1102 crate::opencode::convert::to_codetether_session(&oc_session, messages_with_parts).await
1103 }
1104
1105 pub async fn last_opencode_for_directory(dir: &std::path::Path) -> Result<Self> {
1107 let storage = crate::opencode::OpenCodeStorage::new()
1108 .ok_or_else(|| anyhow::anyhow!("OpenCode storage directory not found"))?;
1109
1110 if !storage.exists() {
1111 anyhow::bail!("OpenCode storage does not exist");
1112 }
1113
1114 let oc_session = storage.last_session_for_directory(dir).await?;
1115 Self::from_opencode(&oc_session.id, &storage).await
1116 }
1117
1118 pub async fn delete(id: &str) -> Result<()> {
1120 let path = Self::session_path(id)?;
1121 if path.exists() {
1122 tokio::fs::remove_file(&path).await?;
1123 }
1124 Ok(())
1125 }
1126
1127 fn sessions_dir() -> Result<PathBuf> {
1129 crate::config::Config::data_dir()
1130 .map(|d| d.join("sessions"))
1131 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
1132 }
1133
1134 fn session_path(id: &str) -> Result<PathBuf> {
1136 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
1137 }
1138
1139 fn event_stream_path() -> Option<PathBuf> {
1142 std::env::var("CODETETHER_EVENT_STREAM_PATH")
1143 .ok()
1144 .map(PathBuf::from)
1145 }
1146}
1147
1148#[derive(Debug, Clone, Serialize, Deserialize)]
1150pub struct SessionResult {
1151 pub text: String,
1152 pub session_id: String,
1153}
1154
1155#[derive(Debug, Clone)]
1157pub enum SessionEvent {
1158 Thinking,
1160 ToolCallStart { name: String, arguments: String },
1162 ToolCallComplete {
1164 name: String,
1165 output: String,
1166 success: bool,
1167 },
1168 TextChunk(String),
1170 TextComplete(String),
1172 ThinkingComplete(String),
1174 UsageReport {
1176 prompt_tokens: usize,
1177 completion_tokens: usize,
1178 duration_ms: u64,
1179 model: String,
1180 },
1181 SessionSync(Session),
1183 Done,
1185 Error(String),
1187}
1188
1189pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
1191 let sessions_dir = crate::config::Config::data_dir()
1192 .map(|d| d.join("sessions"))
1193 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
1194
1195 if !sessions_dir.exists() {
1196 return Ok(Vec::new());
1197 }
1198
1199 let mut summaries = Vec::new();
1200 let mut entries = fs::read_dir(&sessions_dir).await?;
1201
1202 while let Some(entry) = entries.next_entry().await? {
1203 let path = entry.path();
1204 if path.extension().map(|e| e == "json").unwrap_or(false) {
1205 if let Ok(content) = fs::read_to_string(&path).await {
1206 if let Ok(session) = serde_json::from_str::<Session>(&content) {
1207 summaries.push(SessionSummary {
1208 id: session.id,
1209 title: session.title,
1210 created_at: session.created_at,
1211 updated_at: session.updated_at,
1212 message_count: session.messages.len(),
1213 agent: session.agent,
1214 directory: session.metadata.directory,
1215 });
1216 }
1217 }
1218 }
1219 }
1220
1221 summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1222 Ok(summaries)
1223}
1224
1225pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1230 let all = list_sessions().await?;
1231 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
1232 Ok(all
1233 .into_iter()
1234 .filter(|s| {
1235 s.directory
1236 .as_ref()
1237 .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
1238 .unwrap_or(false)
1239 })
1240 .collect())
1241}
1242
1243pub async fn list_sessions_with_opencode(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1249 let mut sessions = list_sessions_for_directory(dir).await?;
1250
1251 if let Some(storage) = crate::opencode::OpenCodeStorage::new() {
1253 if storage.exists() {
1254 if let Ok(oc_sessions) = storage.list_sessions_for_directory(dir).await {
1255 for oc in oc_sessions {
1256 let import_id = format!("opencode_{}", oc.id);
1258 if sessions.iter().any(|s| s.id == import_id) {
1259 continue;
1260 }
1261
1262 sessions.push(SessionSummary {
1263 id: import_id,
1264 title: Some(format!("[opencode] {}", oc.title)),
1265 created_at: oc.created_at,
1266 updated_at: oc.updated_at,
1267 message_count: oc.message_count,
1268 agent: "build".to_string(),
1269 directory: Some(PathBuf::from(&oc.directory)),
1270 });
1271 }
1272 }
1273 }
1274 }
1275
1276 sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1278 Ok(sessions)
1279}
1280
1281#[derive(Debug, Clone, Serialize, Deserialize)]
1283pub struct SessionSummary {
1284 pub id: String,
1285 pub title: Option<String>,
1286 pub created_at: DateTime<Utc>,
1287 pub updated_at: DateTime<Utc>,
1288 pub message_count: usize,
1289 pub agent: String,
1290 #[serde(default)]
1292 pub directory: Option<PathBuf>,
1293}
1294
1295fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
1296 if max_chars == 0 {
1297 return String::new();
1298 }
1299
1300 let mut chars = value.chars();
1301 let mut output = String::new();
1302 for _ in 0..max_chars {
1303 if let Some(ch) = chars.next() {
1304 output.push(ch);
1305 } else {
1306 return value.to_string();
1307 }
1308 }
1309
1310 if chars.next().is_some() {
1311 format!("{output}...")
1312 } else {
1313 output
1314 }
1315}
1316
1317#[allow(dead_code)]
1319use futures::StreamExt;
1320
1321#[allow(dead_code)]
1322trait AsyncCollect<T> {
1323 async fn collect(self) -> Vec<T>;
1324}
1325
1326#[allow(dead_code)]
1327impl<S, T> AsyncCollect<T> for S
1328where
1329 S: futures::Stream<Item = T> + Unpin,
1330{
1331 async fn collect(mut self) -> Vec<T> {
1332 let mut items = Vec::new();
1333 while let Some(item) = self.next().await {
1334 items.push(item);
1335 }
1336 items
1337 }
1338}