1use crate::agent::ToolUse;
6use crate::provider::{Message, Usage};
7use crate::tool::ToolRegistry;
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12use std::sync::Arc;
13use tokio::fs;
14use uuid::Uuid;
15
16#[cfg(feature = "functiongemma")]
17use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
18
19fn is_interactive_tool(tool_name: &str) -> bool {
20 matches!(tool_name, "question")
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct Session {
26 pub id: String,
27 pub title: Option<String>,
28 pub created_at: DateTime<Utc>,
29 pub updated_at: DateTime<Utc>,
30 pub messages: Vec<Message>,
31 pub tool_uses: Vec<ToolUse>,
32 pub usage: Usage,
33 pub agent: String,
34 pub metadata: SessionMetadata,
35}
36
37#[derive(Debug, Clone, Default, Serialize, Deserialize)]
38pub struct SessionMetadata {
39 pub directory: Option<PathBuf>,
40 pub model: Option<String>,
41 pub shared: bool,
42 pub share_url: Option<String>,
43}
44
45impl Session {
46 fn default_model_for_provider(provider: &str) -> String {
47 match provider {
48 "moonshotai" => "kimi-k2.5".to_string(),
49 "anthropic" => "claude-sonnet-4-20250514".to_string(),
50 "openai" => "gpt-4o".to_string(),
51 "google" => "gemini-2.5-pro".to_string(),
52 "zhipuai" => "glm-4.7".to_string(),
53 "openrouter" => "z-ai/glm-4.7".to_string(),
54 "novita" => "qwen/qwen3-coder-next".to_string(),
55 "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
56 _ => "glm-4.7".to_string(),
57 }
58 }
59
60 pub async fn new() -> Result<Self> {
62 let id = Uuid::new_v4().to_string();
63 let now = Utc::now();
64
65 Ok(Self {
66 id,
67 title: None,
68 created_at: now,
69 updated_at: now,
70 messages: Vec::new(),
71 tool_uses: Vec::new(),
72 usage: Usage::default(),
73 agent: "build".to_string(),
74 metadata: SessionMetadata {
75 directory: Some(std::env::current_dir()?),
76 ..Default::default()
77 },
78 })
79 }
80
81 pub async fn load(id: &str) -> Result<Self> {
83 let path = Self::session_path(id)?;
84 let content = fs::read_to_string(&path).await?;
85 let session: Session = serde_json::from_str(&content)?;
86 Ok(session)
87 }
88
89 pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
94 let sessions_dir = Self::sessions_dir()?;
95
96 if !sessions_dir.exists() {
97 anyhow::bail!("No sessions found");
98 }
99
100 let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
101 let mut read_dir = fs::read_dir(&sessions_dir).await?;
102 while let Some(entry) = read_dir.next_entry().await? {
103 entries.push(entry);
104 }
105
106 if entries.is_empty() {
107 anyhow::bail!("No sessions found");
108 }
109
110 entries.sort_by_key(|e| {
113 std::cmp::Reverse(
114 std::fs::metadata(e.path())
115 .ok()
116 .and_then(|m| m.modified().ok())
117 .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
118 )
119 });
120
121 let canonical_workspace = workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
122
123 for entry in &entries {
124 let content: String = fs::read_to_string(entry.path()).await?;
125 if let Ok(session) = serde_json::from_str::<Session>(&content) {
126 if let Some(ref ws) = canonical_workspace {
128 if let Some(ref dir) = session.metadata.directory {
129 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
130 if &canonical_dir == ws {
131 return Ok(session);
132 }
133 }
134 continue;
135 }
136 return Ok(session);
137 }
138 }
139
140 anyhow::bail!("No sessions found")
141 }
142
143 pub async fn last() -> Result<Self> {
145 Self::last_for_directory(None).await
146 }
147
148 pub async fn save(&self) -> Result<()> {
150 let path = Self::session_path(&self.id)?;
151
152 if let Some(parent) = path.parent() {
153 fs::create_dir_all(parent).await?;
154 }
155
156 let content = serde_json::to_string_pretty(self)?;
157 fs::write(&path, content).await?;
158
159 Ok(())
160 }
161
162 pub fn add_message(&mut self, message: Message) {
164 self.messages.push(message);
165 self.updated_at = Utc::now();
166 }
167
168 pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
170 use crate::provider::{
171 CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
172 };
173
174 let registry = ProviderRegistry::from_vault().await?;
176
177 let providers = registry.list();
178 if providers.is_empty() {
179 anyhow::bail!(
180 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
181 );
182 }
183
184 tracing::info!("Available providers: {:?}", providers);
185
186 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
188 let (prov, model) = parse_model_string(model_str);
189 if prov.is_some() {
190 (prov.map(|s| s.to_string()), model.to_string())
192 } else if providers.contains(&model) {
193 (Some(model.to_string()), String::new())
195 } else {
196 (None, model.to_string())
198 }
199 } else {
200 (None, String::new())
201 };
202
203 let selected_provider = provider_name
205 .as_deref()
206 .filter(|p| providers.contains(p))
207 .or_else(|| {
208 if providers.contains(&"zhipuai") {
209 Some("zhipuai")
210 } else {
211 providers.first().copied()
212 }
213 })
214 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
215
216 let provider = registry
217 .get(selected_provider)
218 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
219
220 self.add_message(Message {
222 role: Role::User,
223 content: vec![ContentPart::Text {
224 text: message.to_string(),
225 }],
226 });
227
228 if self.title.is_none() {
230 self.generate_title().await?;
231 }
232
233 let model = if !model_id.is_empty() {
235 model_id
236 } else {
237 Self::default_model_for_provider(selected_provider)
238 };
239
240 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
242 let tool_definitions: Vec<_> = tool_registry
243 .definitions()
244 .into_iter()
245 .filter(|tool| !is_interactive_tool(&tool.name))
246 .collect();
247
248 let temperature = if model.starts_with("kimi-k2") {
250 Some(1.0)
251 } else {
252 Some(0.7)
253 };
254
255 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
256 tracing::info!("Available tools: {}", tool_definitions.len());
257
258 let cwd = self
260 .metadata
261 .directory
262 .clone()
263 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
264 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
265
266 let max_steps = 50;
268 let mut final_output = String::new();
269
270 #[cfg(feature = "functiongemma")]
272 let tool_router: Option<ToolCallRouter> = {
273 let cfg = ToolRouterConfig::from_env();
274 match ToolCallRouter::from_config(&cfg) {
275 Ok(r) => r,
276 Err(e) => {
277 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
278 None
279 }
280 }
281 };
282
283 for step in 1..=max_steps {
284 tracing::info!(step = step, "Agent step starting");
285
286 let mut messages = vec![Message {
288 role: Role::System,
289 content: vec![ContentPart::Text {
290 text: system_prompt.clone(),
291 }],
292 }];
293 messages.extend(self.messages.clone());
294
295 let request = CompletionRequest {
297 messages,
298 tools: tool_definitions.clone(),
299 model: model.clone(),
300 temperature,
301 top_p: None,
302 max_tokens: Some(8192),
303 stop: Vec::new(),
304 };
305
306 let response = provider.complete(request).await?;
308
309 #[cfg(feature = "functiongemma")]
312 let response = if let Some(ref router) = tool_router {
313 router.maybe_reformat(response, &tool_definitions).await
314 } else {
315 response
316 };
317
318 crate::telemetry::TOKEN_USAGE.record_model_usage(
320 &model,
321 response.usage.prompt_tokens as u64,
322 response.usage.completion_tokens as u64,
323 );
324
325 let tool_calls: Vec<(String, String, serde_json::Value)> = response
327 .message
328 .content
329 .iter()
330 .filter_map(|part| {
331 if let ContentPart::ToolCall {
332 id,
333 name,
334 arguments,
335 } = part
336 {
337 let args: serde_json::Value =
339 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
340 Some((id.clone(), name.clone(), args))
341 } else {
342 None
343 }
344 })
345 .collect();
346
347 for part in &response.message.content {
349 if let ContentPart::Text { text } = part {
350 if !text.is_empty() {
351 final_output.push_str(text);
352 final_output.push('\n');
353 }
354 }
355 }
356
357 if tool_calls.is_empty() {
359 self.add_message(response.message.clone());
360 break;
361 }
362
363 self.add_message(response.message.clone());
365
366 tracing::info!(
367 step = step,
368 num_tools = tool_calls.len(),
369 "Executing tool calls"
370 );
371
372 for (tool_id, tool_name, tool_input) in tool_calls {
374 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
375
376 if is_interactive_tool(&tool_name) {
377 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
378 self.add_message(Message {
379 role: Role::Tool,
380 content: vec![ContentPart::ToolResult {
381 tool_call_id: tool_id,
382 content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
383 }],
384 });
385 continue;
386 }
387
388 let content = if let Some(tool) = tool_registry.get(&tool_name) {
390 match tool.execute(tool_input.clone()).await {
391 Ok(result) => {
392 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
393 result.output
394 }
395 Err(e) => {
396 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
397 format!("Error: {}", e)
398 }
399 }
400 } else {
401 tracing::warn!(tool = %tool_name, "Tool not found");
402 format!("Error: Unknown tool '{}'", tool_name)
403 };
404
405 self.add_message(Message {
407 role: Role::Tool,
408 content: vec![ContentPart::ToolResult {
409 tool_call_id: tool_id,
410 content,
411 }],
412 });
413 }
414 }
415
416 self.save().await?;
418
419 Ok(SessionResult {
420 text: final_output.trim().to_string(),
421 session_id: self.id.clone(),
422 })
423 }
424
425 pub async fn prompt_with_events(
428 &mut self,
429 message: &str,
430 event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
431 ) -> Result<SessionResult> {
432 use crate::provider::{
433 CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
434 };
435
436 let _ = event_tx.send(SessionEvent::Thinking).await;
437
438 let registry = ProviderRegistry::from_vault().await?;
440 let providers = registry.list();
441 if providers.is_empty() {
442 anyhow::bail!(
443 "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
444 );
445 }
446 tracing::info!("Available providers: {:?}", providers);
447
448 let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
450 let (prov, model) = parse_model_string(model_str);
451 if prov.is_some() {
452 (prov.map(|s| s.to_string()), model.to_string())
453 } else if providers.contains(&model) {
454 (Some(model.to_string()), String::new())
455 } else {
456 (None, model.to_string())
457 }
458 } else {
459 (None, String::new())
460 };
461
462 let selected_provider = provider_name
464 .as_deref()
465 .filter(|p| providers.contains(p))
466 .or_else(|| {
467 if providers.contains(&"zhipuai") {
468 Some("zhipuai")
469 } else {
470 providers.first().copied()
471 }
472 })
473 .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
474
475 let provider = registry
476 .get(selected_provider)
477 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
478
479 self.add_message(Message {
481 role: Role::User,
482 content: vec![ContentPart::Text {
483 text: message.to_string(),
484 }],
485 });
486
487 if self.title.is_none() {
489 self.generate_title().await?;
490 }
491
492 let model = if !model_id.is_empty() {
494 model_id
495 } else {
496 Self::default_model_for_provider(selected_provider)
497 };
498
499 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
501 let tool_definitions: Vec<_> = tool_registry
502 .definitions()
503 .into_iter()
504 .filter(|tool| !is_interactive_tool(&tool.name))
505 .collect();
506
507 let temperature = if model.starts_with("kimi-k2") {
508 Some(1.0)
509 } else {
510 Some(0.7)
511 };
512
513 tracing::info!("Using model: {} via provider: {}", model, selected_provider);
514 tracing::info!("Available tools: {}", tool_definitions.len());
515
516 let cwd = std::env::var("PWD")
518 .map(std::path::PathBuf::from)
519 .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
520 let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
521
522 let mut final_output = String::new();
523 let max_steps = 50;
524
525 #[cfg(feature = "functiongemma")]
527 let tool_router: Option<ToolCallRouter> = {
528 let cfg = ToolRouterConfig::from_env();
529 match ToolCallRouter::from_config(&cfg) {
530 Ok(r) => r,
531 Err(e) => {
532 tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
533 None
534 }
535 }
536 };
537
538 for step in 1..=max_steps {
539 tracing::info!(step = step, "Agent step starting");
540 let _ = event_tx.send(SessionEvent::Thinking).await;
541
542 let mut messages = vec![Message {
544 role: Role::System,
545 content: vec![ContentPart::Text {
546 text: system_prompt.clone(),
547 }],
548 }];
549 messages.extend(self.messages.clone());
550
551 let request = CompletionRequest {
552 messages,
553 tools: tool_definitions.clone(),
554 model: model.clone(),
555 temperature,
556 top_p: None,
557 max_tokens: Some(8192),
558 stop: Vec::new(),
559 };
560
561 let response = provider.complete(request).await?;
562
563 #[cfg(feature = "functiongemma")]
566 let response = if let Some(ref router) = tool_router {
567 router.maybe_reformat(response, &tool_definitions).await
568 } else {
569 response
570 };
571
572 crate::telemetry::TOKEN_USAGE.record_model_usage(
573 &model,
574 response.usage.prompt_tokens as u64,
575 response.usage.completion_tokens as u64,
576 );
577
578 let tool_calls: Vec<(String, String, serde_json::Value)> = response
580 .message
581 .content
582 .iter()
583 .filter_map(|part| {
584 if let ContentPart::ToolCall {
585 id,
586 name,
587 arguments,
588 } = part
589 {
590 let args: serde_json::Value =
591 serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
592 Some((id.clone(), name.clone(), args))
593 } else {
594 None
595 }
596 })
597 .collect();
598
599 for part in &response.message.content {
601 if let ContentPart::Text { text } = part {
602 if !text.is_empty() {
603 final_output.push_str(text);
604 final_output.push('\n');
605 let _ = event_tx.send(SessionEvent::TextChunk(text.clone())).await;
606 }
607 }
608 }
609
610 if tool_calls.is_empty() {
611 self.add_message(response.message.clone());
612 break;
613 }
614
615 self.add_message(response.message.clone());
616
617 tracing::info!(
618 step = step,
619 num_tools = tool_calls.len(),
620 "Executing tool calls"
621 );
622
623 for (tool_id, tool_name, tool_input) in tool_calls {
625 let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
626 let _ = event_tx
627 .send(SessionEvent::ToolCallStart {
628 name: tool_name.clone(),
629 arguments: args_str,
630 })
631 .await;
632
633 tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
634
635 if is_interactive_tool(&tool_name) {
636 tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
637 let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
638 let _ = event_tx
639 .send(SessionEvent::ToolCallComplete {
640 name: tool_name.clone(),
641 output: content.clone(),
642 success: false,
643 })
644 .await;
645 self.add_message(Message {
646 role: Role::Tool,
647 content: vec![ContentPart::ToolResult {
648 tool_call_id: tool_id,
649 content,
650 }],
651 });
652 continue;
653 }
654
655 let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
656 match tool.execute(tool_input.clone()).await {
657 Ok(result) => {
658 tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
659 (result.output, result.success)
660 }
661 Err(e) => {
662 tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
663 (format!("Error: {}", e), false)
664 }
665 }
666 } else {
667 tracing::warn!(tool = %tool_name, "Tool not found");
668 (format!("Error: Unknown tool '{}'", tool_name), false)
669 };
670
671 let _ = event_tx
672 .send(SessionEvent::ToolCallComplete {
673 name: tool_name.clone(),
674 output: content.clone(),
675 success,
676 })
677 .await;
678
679 self.add_message(Message {
680 role: Role::Tool,
681 content: vec![ContentPart::ToolResult {
682 tool_call_id: tool_id,
683 content,
684 }],
685 });
686 }
687 }
688
689 self.save().await?;
690
691 let _ = event_tx
692 .send(SessionEvent::TextComplete(final_output.trim().to_string()))
693 .await;
694 let _ = event_tx.send(SessionEvent::Done).await;
695
696 Ok(SessionResult {
697 text: final_output.trim().to_string(),
698 session_id: self.id.clone(),
699 })
700 }
701
702 pub async fn generate_title(&mut self) -> Result<()> {
705 if self.title.is_some() {
706 return Ok(());
707 }
708
709 let first_message = self
711 .messages
712 .iter()
713 .find(|m| m.role == crate::provider::Role::User);
714
715 if let Some(msg) = first_message {
716 let text: String = msg
717 .content
718 .iter()
719 .filter_map(|p| match p {
720 crate::provider::ContentPart::Text { text } => Some(text.clone()),
721 _ => None,
722 })
723 .collect::<Vec<_>>()
724 .join(" ");
725
726 self.title = Some(truncate_with_ellipsis(&text, 47));
728 }
729
730 Ok(())
731 }
732
733 pub async fn regenerate_title(&mut self) -> Result<()> {
736 let first_message = self
738 .messages
739 .iter()
740 .find(|m| m.role == crate::provider::Role::User);
741
742 if let Some(msg) = first_message {
743 let text: String = msg
744 .content
745 .iter()
746 .filter_map(|p| match p {
747 crate::provider::ContentPart::Text { text } => Some(text.clone()),
748 _ => None,
749 })
750 .collect::<Vec<_>>()
751 .join(" ");
752
753 self.title = Some(truncate_with_ellipsis(&text, 47));
755 }
756
757 Ok(())
758 }
759
760 pub fn set_title(&mut self, title: impl Into<String>) {
762 self.title = Some(title.into());
763 self.updated_at = Utc::now();
764 }
765
766 pub fn clear_title(&mut self) {
768 self.title = None;
769 self.updated_at = Utc::now();
770 }
771
772 pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
775 self.updated_at = Utc::now();
776
777 if regenerate_title {
778 self.regenerate_title().await?;
779 }
780
781 Ok(())
782 }
783
784 fn sessions_dir() -> Result<PathBuf> {
786 crate::config::Config::data_dir()
787 .map(|d| d.join("sessions"))
788 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
789 }
790
791 fn session_path(id: &str) -> Result<PathBuf> {
793 Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
794 }
795}
796
797#[derive(Debug, Clone, Serialize, Deserialize)]
799pub struct SessionResult {
800 pub text: String,
801 pub session_id: String,
802}
803
804#[derive(Debug, Clone)]
806pub enum SessionEvent {
807 Thinking,
809 ToolCallStart { name: String, arguments: String },
811 ToolCallComplete {
813 name: String,
814 output: String,
815 success: bool,
816 },
817 TextChunk(String),
819 TextComplete(String),
821 Done,
823 Error(String),
825}
826
827pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
829 let sessions_dir = crate::config::Config::data_dir()
830 .map(|d| d.join("sessions"))
831 .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
832
833 if !sessions_dir.exists() {
834 return Ok(Vec::new());
835 }
836
837 let mut summaries = Vec::new();
838 let mut entries = fs::read_dir(&sessions_dir).await?;
839
840 while let Some(entry) = entries.next_entry().await? {
841 let path = entry.path();
842 if path.extension().map(|e| e == "json").unwrap_or(false) {
843 if let Ok(content) = fs::read_to_string(&path).await {
844 if let Ok(session) = serde_json::from_str::<Session>(&content) {
845 summaries.push(SessionSummary {
846 id: session.id,
847 title: session.title,
848 created_at: session.created_at,
849 updated_at: session.updated_at,
850 message_count: session.messages.len(),
851 agent: session.agent,
852 directory: session.metadata.directory,
853 });
854 }
855 }
856 }
857 }
858
859 summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
860 Ok(summaries)
861}
862
863pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
868 let all = list_sessions().await?;
869 let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
870 Ok(all
871 .into_iter()
872 .filter(|s| {
873 s.directory
874 .as_ref()
875 .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
876 .unwrap_or(false)
877 })
878 .collect())
879}
880
881#[derive(Debug, Clone, Serialize, Deserialize)]
883pub struct SessionSummary {
884 pub id: String,
885 pub title: Option<String>,
886 pub created_at: DateTime<Utc>,
887 pub updated_at: DateTime<Utc>,
888 pub message_count: usize,
889 pub agent: String,
890 #[serde(default)]
892 pub directory: Option<PathBuf>,
893}
894
895fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
896 if max_chars == 0 {
897 return String::new();
898 }
899
900 let mut chars = value.chars();
901 let mut output = String::new();
902 for _ in 0..max_chars {
903 if let Some(ch) = chars.next() {
904 output.push(ch);
905 } else {
906 return value.to_string();
907 }
908 }
909
910 if chars.next().is_some() {
911 format!("{output}...")
912 } else {
913 output
914 }
915}
916
917#[allow(dead_code)]
919use futures::StreamExt;
920
921#[allow(dead_code)]
922trait AsyncCollect<T> {
923 async fn collect(self) -> Vec<T>;
924}
925
926#[allow(dead_code)]
927impl<S, T> AsyncCollect<T> for S
928where
929 S: futures::Stream<Item = T> + Unpin,
930{
931 async fn collect(mut self) -> Vec<T> {
932 let mut items = Vec::new();
933 while let Some(item) = self.next().await {
934 items.push(item);
935 }
936 items
937 }
938}