1use crate::agent::ToolUse;
6use crate::provider::{Message, Usage};
7use crate::tool::ToolRegistry;
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12use std::sync::Arc;
13use tokio::fs;
14use uuid::Uuid;
15
16#[cfg(feature = "functiongemma")]
17use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
18
19fn is_interactive_tool(tool_name: &str) -> bool {
20 matches!(tool_name, "question")
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct Session {
26 pub id: String,
27 pub title: Option<String>,
28 pub created_at: DateTime<Utc>,
29 pub updated_at: DateTime<Utc>,
30 pub messages: Vec<Message>,
31 pub tool_uses: Vec<ToolUse>,
32 pub usage: Usage,
33 pub agent: String,
34 pub metadata: SessionMetadata,
35}
36
37#[derive(Debug, Clone, Default, Serialize, Deserialize)]
38pub struct SessionMetadata {
39 pub directory: Option<PathBuf>,
40 pub model: Option<String>,
41 pub shared: bool,
42 pub share_url: Option<String>,
43}
44
45impl Session {
46 fn default_model_for_provider(provider: &str) -> String {
47 match provider {
48 "moonshotai" => "kimi-k2.5".to_string(),
49 "anthropic" => "claude-sonnet-4-20250514".to_string(),
50 "openai" => "gpt-4o".to_string(),
51 "google" => "gemini-2.5-pro".to_string(),
52 "zhipuai" => "glm-4.7".to_string(),
53 "openrouter" => "z-ai/glm-4.7".to_string(),
54 "novita" => "qwen/qwen3-coder-next".to_string(),
55 "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
56 _ => "glm-4.7".to_string(),
57 }
58 }
59
60 pub async fn new() -> Result<Self> {
62 let id = Uuid::new_v4().to_string();
63 let now = Utc::now();
64
65 Ok(Self {
66 id,
67 title: None,
68 created_at: now,
69 updated_at: now,
70 messages: Vec::new(),
71 tool_uses: Vec::new(),
72 usage: Usage::default(),
73 agent: "build".to_string(),
74 metadata: SessionMetadata {
75 directory: Some(std::env::current_dir()?),
76 ..Default::default()
77 },
78 })
79 }
80
81 pub async fn load(id: &str) -> Result<Self> {
83 let path = Self::session_path(id)?;
84 let content = fs::read_to_string(&path).await?;
85 let session: Session = serde_json::from_str(&content)?;
86 Ok(session)
87 }
88
89 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
94 let sessions_dir = Self::sessions_dir()?;
95
96 if !sessions_dir.exists() {
97 anyhow::bail!("No sessions found");
98 }
99
100 let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
101 let mut read_dir = fs::read_dir(&sessions_dir).await?;
102 while let Some(entry) = read_dir.next_entry().await? {
103 entries.push(entry);
104 }
105
106 if entries.is_empty() {
107 anyhow::bail!("No sessions found");
108 }
109
110 entries.sort_by_key(|e| {
113 std::cmp::Reverse(
114 std::fs::metadata(e.path())
115 .ok()
116 .and_then(|m| m.modified().ok())
117 .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
118 )
119 });
120
121 let canonical_workspace =
122 workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
123
124 for entry in &entries {
125 let content: String = fs::read_to_string(entry.path()).await?;
126 if let Ok(session) = serde_json::from_str::<Session>(&content) {
127 if let Some(ref ws) = canonical_workspace {
129 if let Some(ref dir) = session.metadata.directory {
130 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
131 if &canonical_dir == ws {
132 return Ok(session);
133 }
134 }
135 continue;
136 }
137 return Ok(session);
138 }
139 }
140
141 anyhow::bail!("No sessions found")
142 }
143
144 pub async fn last() -> Result<Self> {
146 Self::last_for_directory(None).await
147 }
148
149 pub async fn save(&self) -> Result<()> {
151 let path = Self::session_path(&self.id)?;
152
153 if let Some(parent) = path.parent() {
154 fs::create_dir_all(parent).await?;
155 }
156
157 let content = serde_json::to_string_pretty(self)?;
158 fs::write(&path, content).await?;
159
160 Ok(())
161 }
162
163 pub fn add_message(&mut self, message: Message) {
165 self.messages.push(message);
166 self.updated_at = Utc::now();
167 }
168
169 pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
171 use crate::provider::{
172 CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
173 };
174
175 let registry = ProviderRegistry::from_vault().await?;
177
178 let providers = registry.list();
179 if providers.is_empty() {
180 anyhow::bail!(
181 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
182 );
183 }
184
185 tracing::info!("Available providers: {:?}", providers);
186
187 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
189 let (prov, model) = parse_model_string(model_str);
190 if prov.is_some() {
191 (prov.map(|s| s.to_string()), model.to_string())
193 } else if providers.contains(&model) {
194 (Some(model.to_string()), String::new())
196 } else {
197 (None, model.to_string())
199 }
200 } else {
201 (None, String::new())
202 };
203
204 let selected_provider = provider_name
206 .as_deref()
207 .filter(|p| providers.contains(p))
208 .or_else(|| {
209 if providers.contains(&"zhipuai") {
210 Some("zhipuai")
211 } else {
212 providers.first().copied()
213 }
214 })
215 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
216
217 let provider = registry
218 .get(selected_provider)
219 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
220
221 self.add_message(Message {
223 role: Role::User,
224 content: vec![ContentPart::Text {
225 text: message.to_string(),
226 }],
227 });
228
229 if self.title.is_none() {
231 self.generate_title().await?;
232 }
233
234 let model = if !model_id.is_empty() {
236 model_id
237 } else {
238 Self::default_model_for_provider(selected_provider)
239 };
240
241 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
243 let tool_definitions: Vec<_> = tool_registry
244 .definitions()
245 .into_iter()
246 .filter(|tool| !is_interactive_tool(&tool.name))
247 .collect();
248
249 let temperature = if model.starts_with("kimi-k2") {
251 Some(1.0)
252 } else {
253 Some(0.7)
254 };
255
256 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
257 tracing::info!("Available tools: {}", tool_definitions.len());
258
259 #[cfg(feature = "functiongemma")]
262 let model_supports_tools = provider
263 .list_models()
264 .await
265 .unwrap_or_default()
266 .iter()
267 .find(|m| m.id == model)
268 .map(|m| m.supports_tools)
269 .unwrap_or(true); let cwd = self
273 .metadata
274 .directory
275 .clone()
276 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
277 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
278
279 let max_steps = 50;
281 let mut final_output = String::new();
282
283 #[cfg(feature = "functiongemma")]
285 let tool_router: Option<ToolCallRouter> = {
286 let cfg = ToolRouterConfig::from_env();
287 match ToolCallRouter::from_config(&cfg) {
288 Ok(r) => r,
289 Err(e) => {
290 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
291 None
292 }
293 }
294 };
295
296 for step in 1..=max_steps {
297 tracing::info!(step = step, "Agent step starting");
298
299 let mut messages = vec![Message {
301 role: Role::System,
302 content: vec![ContentPart::Text {
303 text: system_prompt.clone(),
304 }],
305 }];
306 messages.extend(self.messages.clone());
307
308 let request = CompletionRequest {
310 messages,
311 tools: tool_definitions.clone(),
312 model: model.clone(),
313 temperature,
314 top_p: None,
315 max_tokens: Some(8192),
316 stop: Vec::new(),
317 };
318
319 let response = provider.complete(request).await?;
321
322 #[cfg(feature = "functiongemma")]
326 let response = if let Some(ref router) = tool_router {
327 router
328 .maybe_reformat(response, &tool_definitions, model_supports_tools)
329 .await
330 } else {
331 response
332 };
333
334 crate::telemetry::TOKEN_USAGE.record_model_usage(
336 &model,
337 response.usage.prompt_tokens as u64,
338 response.usage.completion_tokens as u64,
339 );
340
341 let tool_calls: Vec<(String, String, serde_json::Value)> = response
343 .message
344 .content
345 .iter()
346 .filter_map(|part| {
347 if let ContentPart::ToolCall {
348 id,
349 name,
350 arguments,
351 } = part
352 {
353 let args: serde_json::Value =
355 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
356 Some((id.clone(), name.clone(), args))
357 } else {
358 None
359 }
360 })
361 .collect();
362
363 for part in &response.message.content {
365 if let ContentPart::Text { text } = part {
366 if !text.is_empty() {
367 final_output.push_str(text);
368 final_output.push('\n');
369 }
370 }
371 }
372
373 if tool_calls.is_empty() {
375 self.add_message(response.message.clone());
376 break;
377 }
378
379 self.add_message(response.message.clone());
381
382 tracing::info!(
383 step = step,
384 num_tools = tool_calls.len(),
385 "Executing tool calls"
386 );
387
388 for (tool_id, tool_name, tool_input) in tool_calls {
390 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
391
392 if is_interactive_tool(&tool_name) {
393 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
394 self.add_message(Message {
395 role: Role::Tool,
396 content: vec![ContentPart::ToolResult {
397 tool_call_id: tool_id,
398 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
399 }],
400 });
401 continue;
402 }
403
404 let content = if let Some(tool) = tool_registry.get(&tool_name) {
406 match tool.execute(tool_input.clone()).await {
407 Ok(result) => {
408 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
409 result.output
410 }
411 Err(e) => {
412 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
413 format!("Error: {}", e)
414 }
415 }
416 } else {
417 tracing::warn!(tool = %tool_name, "Tool not found");
418 format!("Error: Unknown tool '{}'", tool_name)
419 };
420
421 self.add_message(Message {
423 role: Role::Tool,
424 content: vec![ContentPart::ToolResult {
425 tool_call_id: tool_id,
426 content,
427 }],
428 });
429 }
430 }
431
432 self.save().await?;
434
435 Ok(SessionResult {
436 text: final_output.trim().to_string(),
437 session_id: self.id.clone(),
438 })
439 }
440
441 pub async fn prompt_with_events(
444 &mut self,
445 message: &str,
446 event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
447 ) -> Result<SessionResult> {
448 use crate::provider::{
449 CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
450 };
451
452 let _ = event_tx.send(SessionEvent::Thinking).await;
453
454 let registry = ProviderRegistry::from_vault().await?;
456 let providers = registry.list();
457 if providers.is_empty() {
458 anyhow::bail!(
459 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
460 );
461 }
462 tracing::info!("Available providers: {:?}", providers);
463
464 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
466 let (prov, model) = parse_model_string(model_str);
467 if prov.is_some() {
468 (prov.map(|s| s.to_string()), model.to_string())
469 } else if providers.contains(&model) {
470 (Some(model.to_string()), String::new())
471 } else {
472 (None, model.to_string())
473 }
474 } else {
475 (None, String::new())
476 };
477
478 let selected_provider = provider_name
480 .as_deref()
481 .filter(|p| providers.contains(p))
482 .or_else(|| {
483 if providers.contains(&"zhipuai") {
484 Some("zhipuai")
485 } else {
486 providers.first().copied()
487 }
488 })
489 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
490
491 let provider = registry
492 .get(selected_provider)
493 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
494
495 self.add_message(Message {
497 role: Role::User,
498 content: vec![ContentPart::Text {
499 text: message.to_string(),
500 }],
501 });
502
503 if self.title.is_none() {
505 self.generate_title().await?;
506 }
507
508 let model = if !model_id.is_empty() {
510 model_id
511 } else {
512 Self::default_model_for_provider(selected_provider)
513 };
514
515 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
517 let tool_definitions: Vec<_> = tool_registry
518 .definitions()
519 .into_iter()
520 .filter(|tool| !is_interactive_tool(&tool.name))
521 .collect();
522
523 let temperature = if model.starts_with("kimi-k2") {
524 Some(1.0)
525 } else {
526 Some(0.7)
527 };
528
529 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
530 tracing::info!("Available tools: {}", tool_definitions.len());
531
532 #[cfg(feature = "functiongemma")]
534 let model_supports_tools = provider
535 .list_models()
536 .await
537 .unwrap_or_default()
538 .iter()
539 .find(|m| m.id == model)
540 .map(|m| m.supports_tools)
541 .unwrap_or(true);
542
543 let cwd = std::env::var("PWD")
545 .map(std::path::PathBuf::from)
546 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
547 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
548
549 let mut final_output = String::new();
550 let max_steps = 50;
551
552 #[cfg(feature = "functiongemma")]
554 let tool_router: Option<ToolCallRouter> = {
555 let cfg = ToolRouterConfig::from_env();
556 match ToolCallRouter::from_config(&cfg) {
557 Ok(r) => r,
558 Err(e) => {
559 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
560 None
561 }
562 }
563 };
564
565 for step in 1..=max_steps {
566 tracing::info!(step = step, "Agent step starting");
567 let _ = event_tx.send(SessionEvent::Thinking).await;
568
569 let mut messages = vec![Message {
571 role: Role::System,
572 content: vec![ContentPart::Text {
573 text: system_prompt.clone(),
574 }],
575 }];
576 messages.extend(self.messages.clone());
577
578 let request = CompletionRequest {
579 messages,
580 tools: tool_definitions.clone(),
581 model: model.clone(),
582 temperature,
583 top_p: None,
584 max_tokens: Some(8192),
585 stop: Vec::new(),
586 };
587
588 let response = provider.complete(request).await?;
589
590 #[cfg(feature = "functiongemma")]
593 let response = if let Some(ref router) = tool_router {
594 router
595 .maybe_reformat(response, &tool_definitions, model_supports_tools)
596 .await
597 } else {
598 response
599 };
600
601 crate::telemetry::TOKEN_USAGE.record_model_usage(
602 &model,
603 response.usage.prompt_tokens as u64,
604 response.usage.completion_tokens as u64,
605 );
606
607 let tool_calls: Vec<(String, String, serde_json::Value)> = response
609 .message
610 .content
611 .iter()
612 .filter_map(|part| {
613 if let ContentPart::ToolCall {
614 id,
615 name,
616 arguments,
617 } = part
618 {
619 let args: serde_json::Value =
620 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
621 Some((id.clone(), name.clone(), args))
622 } else {
623 None
624 }
625 })
626 .collect();
627
628 for part in &response.message.content {
630 if let ContentPart::Text { text } = part {
631 if !text.is_empty() {
632 final_output.push_str(text);
633 final_output.push('\n');
634 let _ = event_tx.send(SessionEvent::TextChunk(text.clone())).await;
635 }
636 }
637 }
638
639 if tool_calls.is_empty() {
640 self.add_message(response.message.clone());
641 break;
642 }
643
644 self.add_message(response.message.clone());
645
646 tracing::info!(
647 step = step,
648 num_tools = tool_calls.len(),
649 "Executing tool calls"
650 );
651
652 for (tool_id, tool_name, tool_input) in tool_calls {
654 let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
655 let _ = event_tx
656 .send(SessionEvent::ToolCallStart {
657 name: tool_name.clone(),
658 arguments: args_str,
659 })
660 .await;
661
662 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
663
664 if is_interactive_tool(&tool_name) {
665 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
666 let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
667 let _ = event_tx
668 .send(SessionEvent::ToolCallComplete {
669 name: tool_name.clone(),
670 output: content.clone(),
671 success: false,
672 })
673 .await;
674 self.add_message(Message {
675 role: Role::Tool,
676 content: vec![ContentPart::ToolResult {
677 tool_call_id: tool_id,
678 content,
679 }],
680 });
681 continue;
682 }
683
684 let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
685 match tool.execute(tool_input.clone()).await {
686 Ok(result) => {
687 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
688 (result.output, result.success)
689 }
690 Err(e) => {
691 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
692 (format!("Error: {}", e), false)
693 }
694 }
695 } else {
696 tracing::warn!(tool = %tool_name, "Tool not found");
697 (format!("Error: Unknown tool '{}'", tool_name), false)
698 };
699
700 let _ = event_tx
701 .send(SessionEvent::ToolCallComplete {
702 name: tool_name.clone(),
703 output: content.clone(),
704 success,
705 })
706 .await;
707
708 self.add_message(Message {
709 role: Role::Tool,
710 content: vec![ContentPart::ToolResult {
711 tool_call_id: tool_id,
712 content,
713 }],
714 });
715 }
716 }
717
718 self.save().await?;
719
720 let _ = event_tx
721 .send(SessionEvent::TextComplete(final_output.trim().to_string()))
722 .await;
723 let _ = event_tx.send(SessionEvent::Done).await;
724
725 Ok(SessionResult {
726 text: final_output.trim().to_string(),
727 session_id: self.id.clone(),
728 })
729 }
730
731 pub async fn generate_title(&mut self) -> Result<()> {
734 if self.title.is_some() {
735 return Ok(());
736 }
737
738 let first_message = self
740 .messages
741 .iter()
742 .find(|m| m.role == crate::provider::Role::User);
743
744 if let Some(msg) = first_message {
745 let text: String = msg
746 .content
747 .iter()
748 .filter_map(|p| match p {
749 crate::provider::ContentPart::Text { text } => Some(text.clone()),
750 _ => None,
751 })
752 .collect::<Vec<_>>()
753 .join(" ");
754
755 self.title = Some(truncate_with_ellipsis(&text, 47));
757 }
758
759 Ok(())
760 }
761
762 pub async fn regenerate_title(&mut self) -> Result<()> {
765 let first_message = self
767 .messages
768 .iter()
769 .find(|m| m.role == crate::provider::Role::User);
770
771 if let Some(msg) = first_message {
772 let text: String = msg
773 .content
774 .iter()
775 .filter_map(|p| match p {
776 crate::provider::ContentPart::Text { text } => Some(text.clone()),
777 _ => None,
778 })
779 .collect::<Vec<_>>()
780 .join(" ");
781
782 self.title = Some(truncate_with_ellipsis(&text, 47));
784 }
785
786 Ok(())
787 }
788
789 pub fn set_title(&mut self, title: impl Into<String>) {
791 self.title = Some(title.into());
792 self.updated_at = Utc::now();
793 }
794
795 pub fn clear_title(&mut self) {
797 self.title = None;
798 self.updated_at = Utc::now();
799 }
800
801 pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
804 self.updated_at = Utc::now();
805
806 if regenerate_title {
807 self.regenerate_title().await?;
808 }
809
810 Ok(())
811 }
812
813 fn sessions_dir() -> Result<PathBuf> {
815 crate::config::Config::data_dir()
816 .map(|d| d.join("sessions"))
817 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
818 }
819
820 fn session_path(id: &str) -> Result<PathBuf> {
822 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
823 }
824}
825
826#[derive(Debug, Clone, Serialize, Deserialize)]
828pub struct SessionResult {
829 pub text: String,
830 pub session_id: String,
831}
832
833#[derive(Debug, Clone)]
835pub enum SessionEvent {
836 Thinking,
838 ToolCallStart { name: String, arguments: String },
840 ToolCallComplete {
842 name: String,
843 output: String,
844 success: bool,
845 },
846 TextChunk(String),
848 TextComplete(String),
850 Done,
852 Error(String),
854}
855
856pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
858 let sessions_dir = crate::config::Config::data_dir()
859 .map(|d| d.join("sessions"))
860 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
861
862 if !sessions_dir.exists() {
863 return Ok(Vec::new());
864 }
865
866 let mut summaries = Vec::new();
867 let mut entries = fs::read_dir(&sessions_dir).await?;
868
869 while let Some(entry) = entries.next_entry().await? {
870 let path = entry.path();
871 if path.extension().map(|e| e == "json").unwrap_or(false) {
872 if let Ok(content) = fs::read_to_string(&path).await {
873 if let Ok(session) = serde_json::from_str::<Session>(&content) {
874 summaries.push(SessionSummary {
875 id: session.id,
876 title: session.title,
877 created_at: session.created_at,
878 updated_at: session.updated_at,
879 message_count: session.messages.len(),
880 agent: session.agent,
881 directory: session.metadata.directory,
882 });
883 }
884 }
885 }
886 }
887
888 summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
889 Ok(summaries)
890}
891
892pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
897 let all = list_sessions().await?;
898 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
899 Ok(all
900 .into_iter()
901 .filter(|s| {
902 s.directory
903 .as_ref()
904 .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
905 .unwrap_or(false)
906 })
907 .collect())
908}
909
910#[derive(Debug, Clone, Serialize, Deserialize)]
912pub struct SessionSummary {
913 pub id: String,
914 pub title: Option<String>,
915 pub created_at: DateTime<Utc>,
916 pub updated_at: DateTime<Utc>,
917 pub message_count: usize,
918 pub agent: String,
919 #[serde(default)]
921 pub directory: Option<PathBuf>,
922}
923
924fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
925 if max_chars == 0 {
926 return String::new();
927 }
928
929 let mut chars = value.chars();
930 let mut output = String::new();
931 for _ in 0..max_chars {
932 if let Some(ch) = chars.next() {
933 output.push(ch);
934 } else {
935 return value.to_string();
936 }
937 }
938
939 if chars.next().is_some() {
940 format!("{output}...")
941 } else {
942 output
943 }
944}
945
946#[allow(dead_code)]
948use futures::StreamExt;
949
950#[allow(dead_code)]
951trait AsyncCollect<T> {
952 async fn collect(self) -> Vec<T>;
953}
954
955#[allow(dead_code)]
956impl<S, T> AsyncCollect<T> for S
957where
958 S: futures::Stream<Item = T> + Unpin,
959{
960 async fn collect(mut self) -> Vec<T> {
961 let mut items = Vec::new();
962 while let Some(item) = self.next().await {
963 items.push(item);
964 }
965 items
966 }
967}