mimo_api/client.rs
1//! HTTP client for the MiMo API.
2
3use {
4 crate::{
5 error::{Error, Result},
6 types::*,
7 },
8 eventsource_stream::Eventsource,
9 futures::{StreamExt, stream::BoxStream},
10 reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue},
11 std::env,
12};
13
14const API_BASE_URL: &str = "https://api.xiaomimimo.com/v1";
15const ENV_API_KEY: &str = "XIAOMI_API_KEY";
16
17/// HTTP client for the MiMo API.
18#[derive(Debug, Clone)]
19pub struct Client {
20 /// The underlying HTTP client.
21 http_client: reqwest::Client,
22 /// The API key for authentication.
23 api_key: String,
24 /// The base URL for the API.
25 base_url: String,
26}
27
28impl Client {
29 /// Create a new client with the given API key.
30 ///
31 /// # Example
32 ///
33 /// ```rust
34 /// use mimo_api::Client;
35 ///
36 /// let client = Client::new("your-api-key");
37 /// ```
38 pub fn new(api_key: impl Into<String>) -> Self {
39 Self {
40 http_client: reqwest::Client::new(),
41 api_key: api_key.into(),
42 base_url: API_BASE_URL.to_string(),
43 }
44 }
45
46 /// Create a new client from the `XIAOMI_API_KEY` environment variable.
47 ///
48 /// # Errors
49 ///
50 /// Returns an error if the `XIAOMI_API_KEY` environment variable is not set.
51 ///
52 /// # Example
53 ///
54 /// ```rust,no_run
55 /// use mimo_api::Client;
56 ///
57 /// // Assuming XIAOMI_API_KEY is set in environment
58 /// let client = Client::from_env()?;
59 /// # Ok::<(), Box<dyn std::error::Error>>(())
60 /// ```
61 pub fn from_env() -> Result<Self> {
62 let api_key = env::var(ENV_API_KEY).map_err(|_| Error::MissingApiKey)?;
63 Ok(Self::new(api_key))
64 }
65
66 /// Set a custom base URL for the API.
67 ///
68 /// This is useful for testing or using a custom API endpoint.
69 pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
70 self.base_url = base_url.into();
71 self
72 }
73
74 /// Build headers for the request.
75 fn build_headers(&self) -> Result<HeaderMap> {
76 let mut headers = HeaderMap::new();
77 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
78 headers.insert(
79 "api-key",
80 HeaderValue::from_str(&self.api_key)
81 .map_err(|_| Error::InvalidParameter("Invalid API key".into()))?,
82 );
83 Ok(headers)
84 }
85
86 /// Send a chat completion request.
87 ///
88 /// # Example
89 ///
90 /// ```rust,no_run
91 /// use mimo_api::{Client, ChatRequest, Message};
92 ///
93 /// #[tokio::main]
94 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
95 /// let client = Client::from_env()?;
96 /// let request = ChatRequest::new("mimo-v2-flash")
97 /// .message(Message::user("Hello!"));
98 /// let response = client.chat(request).await?;
99 /// println!("{}", response.choices[0].message.content);
100 /// Ok(())
101 /// }
102 /// ```
103 pub async fn chat(&self, request: ChatRequest) -> Result<ChatResponse> {
104 let url = format!("{}/chat/completions", self.base_url);
105 let headers = self.build_headers()?;
106
107 let response = self
108 .http_client
109 .post(&url)
110 .headers(headers)
111 .json(&request)
112 .send()
113 .await?;
114
115 let status = response.status();
116 if !status.is_success() {
117 let error_text = response.text().await.unwrap_or_default();
118 return Err(Error::api_error(status.as_u16(), error_text));
119 }
120
121 response.json().await.map_err(Error::from)
122 }
123
124 /// Send a chat completion request with streaming response.
125 ///
126 /// Returns a stream of `StreamChunk` objects.
127 ///
128 /// # Example
129 ///
130 /// ```rust,no_run
131 /// use mimo_api::{Client, ChatRequest, Message};
132 /// use futures::StreamExt;
133 ///
134 /// #[tokio::main]
135 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
136 /// let client = Client::from_env()?;
137 /// let request = ChatRequest::new("mimo-v2-flash")
138 /// .message(Message::user("Tell me a story."))
139 /// .stream(true);
140 ///
141 /// let mut stream = client.chat_stream(request).await?;
142 /// while let Some(chunk) = stream.next().await {
143 /// match chunk {
144 /// Ok(chunk) => {
145 /// if let Some(content) = &chunk.choices[0].delta.content {
146 /// print!("{}", content);
147 /// }
148 /// }
149 /// Err(e) => eprintln!("Error: {}", e),
150 /// }
151 /// }
152 /// Ok(())
153 /// }
154 /// ```
155 pub async fn chat_stream(
156 &self,
157 request: ChatRequest,
158 ) -> Result<BoxStream<'static, Result<StreamChunk>>> {
159 let mut request = request;
160 request.stream = Some(true);
161
162 let url = format!("{}/chat/completions", self.base_url);
163 let headers = self.build_headers()?;
164
165 let response = self
166 .http_client
167 .post(&url)
168 .headers(headers)
169 .json(&request)
170 .send()
171 .await?;
172
173 let status = response.status();
174 if !status.is_success() {
175 let error_text = response.text().await.unwrap_or_default();
176 return Err(Error::api_error(status.as_u16(), error_text));
177 }
178
179 let stream = response
180 .bytes_stream()
181 .eventsource()
182 .filter_map(|event| async move {
183 match event {
184 Ok(event) => {
185 if event.data == "[DONE]" {
186 None
187 } else {
188 match serde_json::from_str::<StreamChunk>(&event.data) {
189 Ok(chunk) => Some(Ok(chunk)),
190 Err(e) => Some(Err(Error::StreamError(e.to_string()))),
191 }
192 }
193 }
194 Err(e) => Some(Err(Error::StreamError(e.to_string()))),
195 }
196 })
197 .boxed();
198
199 Ok(stream)
200 }
201
202 /// Create a text-to-speech request builder.
203 ///
204 /// This method creates a builder for synthesizing speech from text using the `mimo-v2-tts` model.
205 ///
206 /// # Arguments
207 ///
208 /// * `text` - The text to synthesize. This text will be placed in an `assistant` message.
209 ///
210 /// # Example
211 ///
212 /// ```rust,no_run
213 /// use mimo_api::{Client, Voice};
214 ///
215 /// #[tokio::main]
216 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
217 /// let client = Client::from_env()?;
218 ///
219 /// let response = client.tts("Hello, world!")
220 /// .voice(Voice::DefaultEn)
221 /// .send()
222 /// .await?;
223 ///
224 /// let audio = response.audio()?;
225 /// let audio_bytes = audio.decode_data()?;
226 /// std::fs::write("output.wav", audio_bytes)?;
227 /// Ok(())
228 /// }
229 /// ```
230 pub fn tts(&self, text: impl Into<String>) -> TtsRequestBuilder {
231 TtsRequestBuilder::new(self.clone(), text.into())
232 }
233
234 /// Create a text-to-speech request builder with styled text.
235 ///
236 /// This method allows you to apply style controls to the synthesized speech.
237 ///
238 /// # Example
239 ///
240 /// ```rust,no_run
241 /// use mimo_api::{Client, Voice};
242 ///
243 /// #[tokio::main]
244 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
245 /// let client = Client::from_env()?;
246 ///
247 /// // Synthesize speech with "开心" (happy) style
248 /// let response = client.tts_styled("开心", "明天就是周五了,真开心!")
249 /// .voice(Voice::DefaultZh)
250 /// .send()
251 /// .await?;
252 ///
253 /// let audio = response.audio()?;
254 /// let audio_bytes = audio.decode_data()?;
255 /// std::fs::write("output.wav", audio_bytes)?;
256 /// Ok(())
257 /// }
258 /// ```
259 pub fn tts_styled(&self, style: &str, text: &str) -> TtsRequestBuilder {
260 TtsRequestBuilder::new(self.clone(), styled_text(style, text))
261 }
262
263 /// Create a streaming text-to-speech request builder.
264 ///
265 /// This method creates a builder for streaming speech synthesis using the `mimo-v2-tts` model.
266 /// Streaming TTS delivers audio data in real-time chunks.
267 ///
268 /// # Arguments
269 ///
270 /// * `text` - The text to synthesize. This text will be placed in an `assistant` message.
271 ///
272 /// # Example
273 ///
274 /// ```rust,no_run
275 /// use mimo_api::{Client, Voice};
276 /// use futures::StreamExt;
277 /// use tokio::fs::File;
278 /// use tokio::io::AsyncWriteExt;
279 ///
280 /// #[tokio::main]
281 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
282 /// let client = Client::from_env()?;
283 ///
284 /// let mut stream = client.tts_stream("Hello, world!")
285 /// .voice(Voice::DefaultEn)
286 /// .send()
287 /// .await?;
288 ///
289 /// let mut file = File::create("output.pcm").await?;
290 /// let mut total_bytes = 0;
291 ///
292 /// while let Some(chunk) = stream.next().await {
293 /// let audio_bytes = chunk?;
294 /// file.write_all(&audio_bytes).await?;
295 /// total_bytes += audio_bytes.len();
296 /// }
297 ///
298 /// println!("Total bytes: {}", total_bytes);
299 /// Ok(())
300 /// }
301 /// ```
302 pub fn tts_stream(&self, text: impl Into<String>) -> StreamingTtsRequestBuilder {
303 StreamingTtsRequestBuilder::new(self.clone(), text.into())
304 }
305
306 /// Create a streaming text-to-speech request builder with styled text.
307 ///
308 /// This method allows you to apply style controls to the streaming synthesized speech.
309 ///
310 /// # Arguments
311 ///
312 /// * `style` - The style to apply (e.g., "开心", "悲伤", "变快", "变慢")
313 /// * `text` - The text to synthesize
314 ///
315 /// # Example
316 ///
317 /// ```rust,no_run
318 /// use mimo_api::{Client, Voice};
319 /// use futures::StreamExt;
320 /// use tokio::fs::File;
321 /// use tokio::io::AsyncWriteExt;
322 ///
323 /// #[tokio::main]
324 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
325 /// let client = Client::from_env()?;
326 ///
327 /// // Synthesize speech with "开心" (happy) style
328 /// let mut stream = client.tts_styled_stream("开心", "明天就是周五了,真开心!")
329 /// .voice(Voice::DefaultZh)
330 /// .send()
331 /// .await?;
332 ///
333 /// let mut file = File::create("output.pcm").await?;
334 /// let mut total_bytes = 0;
335 ///
336 /// while let Some(chunk) = stream.next().await {
337 /// let audio_bytes = chunk?;
338 /// file.write_all(&audio_bytes).await?;
339 /// total_bytes += audio_bytes.len();
340 /// }
341 ///
342 /// println!("Total bytes: {}", total_bytes);
343 /// Ok(())
344 /// }
345 /// ```
346 pub fn tts_styled_stream(&self, style: &str, text: &str) -> StreamingTtsRequestBuilder {
347 StreamingTtsRequestBuilder::new(self.clone(), styled_text(style, text))
348 }
349}
350
351/// Builder for text-to-speech requests.
352///
353/// This builder provides a fluent API for configuring TTS requests.
354#[derive(Debug, Clone)]
355pub struct TtsRequestBuilder {
356 client: Client,
357 text: String,
358 user_message: Option<String>,
359 voice: Voice,
360 format: AudioFormat,
361}
362
363impl TtsRequestBuilder {
364 /// Create a new TTS request builder.
365 fn new(client: Client, text: String) -> Self {
366 Self {
367 client,
368 text,
369 user_message: None,
370 voice: Voice::default(),
371 format: AudioFormat::default(),
372 }
373 }
374
375 /// Set the voice for synthesis.
376 ///
377 /// Available voices:
378 /// - `Voice::MimoDefault` - MiMo default voice (balanced tone)
379 /// - `Voice::DefaultEn` - Default English female voice
380 /// - `Voice::DefaultZh` - Default Chinese female voice
381 pub fn voice(mut self, voice: Voice) -> Self {
382 self.voice = voice;
383 self
384 }
385
386 /// Set the audio output format.
387 ///
388 /// Available formats:
389 /// - `AudioFormat::Wav` - WAV format (recommended for high quality)
390 /// - `AudioFormat::Mp3` - MP3 format (smaller file size)
391 /// - `AudioFormat::Pcm` - PCM format (for streaming)
392 pub fn format(mut self, format: AudioFormat) -> Self {
393 self.format = format;
394 self
395 }
396
397 /// Add a user message to influence the synthesis style.
398 ///
399 /// The user message can help adjust the tone and style of the synthesized speech.
400 pub fn user_message(mut self, message: impl Into<String>) -> Self {
401 self.user_message = Some(message.into());
402 self
403 }
404
405 /// Send the TTS request and return the response.
406 ///
407 /// # Returns
408 ///
409 /// A `TtsResponse` containing the synthesized audio data.
410 ///
411 /// # Example
412 ///
413 /// ```rust,no_run
414 /// use mimo_api::{Client, Voice, AudioFormat};
415 ///
416 /// #[tokio::main]
417 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
418 /// let client = Client::from_env()?;
419 ///
420 /// let response = client.tts("Hello, world!")
421 /// .voice(Voice::DefaultEn)
422 /// .format(AudioFormat::Mp3)
423 /// .send()
424 /// .await?;
425 ///
426 /// let audio = response.audio()?;
427 /// println!("Audio ID: {}", audio.id);
428 /// println!("Transcript: {:?}", audio.transcript());
429 /// Ok(())
430 /// }
431 /// ```
432 pub async fn send(self) -> Result<TtsResponse> {
433 let mut messages = Vec::new();
434
435 // Add optional user message
436 if let Some(user_msg) = self.user_message {
437 messages.push(Message::user(MessageContent::Text(user_msg)));
438 }
439
440 // Add assistant message with text to synthesize
441 messages.push(Message::assistant(MessageContent::Text(self.text)));
442
443 let request = ChatRequest {
444 model: Model::MiMoV2Tts.to_string(),
445 messages,
446 audio: Some(Audio {
447 format: Some(self.format),
448 voice: Some(self.voice),
449 }),
450 ..Default::default()
451 };
452
453 let response = self.client.chat(request).await?;
454 Ok(TtsResponse(response))
455 }
456}
457
458/// Response from a text-to-speech request.
459#[derive(Debug, Clone)]
460pub struct TtsResponse(pub ChatResponse);
461
462impl TtsResponse {
463 /// Get the audio data from the response.
464 ///
465 /// # Errors
466 ///
467 /// Returns an error if no audio data is present in the response.
468 pub fn audio(&self) -> Result<&ResponseAudio> {
469 self.0
470 .choices
471 .first()
472 .and_then(|c| c.message.audio.as_ref())
473 .ok_or_else(|| Error::InvalidResponse("No audio data in response".into()))
474 }
475
476 /// Get the content text from the response.
477 pub fn content(&self) -> Option<&str> {
478 self.0.choices.first().map(|c| c.message.content.as_str())
479 }
480
481 /// Get the underlying chat response.
482 pub fn into_inner(self) -> ChatResponse {
483 self.0
484 }
485}
486
487/// Builder for streaming text-to-speech requests.
488///
489/// This builder provides a fluent API for configuring streaming TTS requests.
490#[derive(Debug, Clone)]
491pub struct StreamingTtsRequestBuilder {
492 client: Client,
493 text: String,
494 user_message: Option<String>,
495 voice: Voice,
496}
497
498impl StreamingTtsRequestBuilder {
499 /// Create a new streaming TTS request builder.
500 fn new(client: Client, text: String) -> Self {
501 Self {
502 client,
503 text,
504 user_message: None,
505 voice: Voice::default(),
506 }
507 }
508
509 /// Set the voice for synthesis.
510 ///
511 /// Available voices:
512 /// - `Voice::MimoDefault` - MiMo default voice (balanced tone)
513 /// - `Voice::DefaultEn` - Default English female voice
514 /// - `Voice::DefaultZh` - Default Chinese female voice
515 ///
516 /// # Example
517 ///
518 /// ```rust,no_run
519 /// use mimo_api::{Client, Voice};
520 ///
521 /// #[tokio::main]
522 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
523 /// let client = Client::from_env()?;
524 ///
525 /// let stream = client.tts_stream("Hello!")
526 /// .voice(Voice::DefaultEn)
527 /// .send()
528 /// .await?;
529 ///
530 /// Ok(())
531 /// }
532 /// ```
533 pub fn voice(mut self, voice: Voice) -> Self {
534 self.voice = voice;
535 self
536 }
537
538 /// Add a user message to influence the synthesis style.
539 ///
540 /// The user message can help adjust the tone and style of the synthesized speech.
541 ///
542 /// # Example
543 ///
544 /// ```rust,no_run
545 /// use mimo_api::Client;
546 ///
547 /// #[tokio::main]
548 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
549 /// let client = Client::from_env()?;
550 ///
551 /// let stream = client.tts_stream("Hello there!")
552 /// .user_message("Speak in a friendly, conversational tone")
553 /// .send()
554 /// .await?;
555 ///
556 /// Ok(())
557 /// }
558 /// ```
559 pub fn user_message(mut self, message: impl Into<String>) -> Self {
560 self.user_message = Some(message.into());
561 self
562 }
563
564 /// Send the streaming TTS request and return the response stream.
565 ///
566 /// # Returns
567 ///
568 /// A `StreamingTtsResponse` that yields audio data chunks.
569 ///
570 /// # Example
571 ///
572 /// ```rust,no_run
573 /// use mimo_api::{Client, Voice};
574 /// use futures::StreamExt;
575 /// use tokio::fs::File;
576 /// use tokio::io::AsyncWriteExt;
577 ///
578 /// #[tokio::main]
579 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
580 /// let client = Client::from_env()?;
581 ///
582 /// let mut stream = client.tts_stream("Hello, world!")
583 /// .voice(Voice::DefaultEn)
584 /// .send()
585 /// .await?;
586 ///
587 /// let mut file = File::create("output.pcm").await?;
588 /// let mut total_bytes = 0;
589 ///
590 /// while let Some(result) = stream.next().await {
591 /// let audio_bytes = result?;
592 /// file.write_all(&audio_bytes).await?;
593 /// total_bytes += audio_bytes.len();
594 /// }
595 ///
596 /// println!("Total bytes: {}", total_bytes);
597 /// Ok(())
598 /// }
599 /// ```
600 pub async fn send(self) -> Result<StreamingTtsResponse> {
601 let mut messages = Vec::new();
602
603 // Add optional user message
604 if let Some(user_msg) = self.user_message {
605 messages.push(Message::user(MessageContent::Text(user_msg)));
606 }
607
608 // Add assistant message with text to synthesize
609 messages.push(Message::assistant(MessageContent::Text(self.text)));
610
611 let request = ChatRequest {
612 model: Model::MiMoV2Tts.to_string(),
613 messages,
614 stream: Some(true),
615 audio: Some(Audio {
616 format: Some(AudioFormat::Pcm), // PCM is recommended for streaming
617 voice: Some(self.voice),
618 }),
619 ..Default::default()
620 };
621
622 let stream = self.client.chat_stream(request).await?;
623 Ok(StreamingTtsResponse::new(stream))
624 }
625}
626
627/// Response from a streaming text-to-speech request.
628///
629/// This type wraps the underlying stream and provides convenience methods
630/// for consuming audio data.
631pub struct StreamingTtsResponse {
632 stream: BoxStream<'static, Result<StreamChunk>>,
633 total_bytes: u64,
634 chunk_count: u32,
635}
636
637impl StreamingTtsResponse {
638 /// Create a new streaming TTS response.
639 fn new(stream: BoxStream<'static, Result<StreamChunk>>) -> Self {
640 Self {
641 stream,
642 total_bytes: 0,
643 chunk_count: 0,
644 }
645 }
646
647 /// Collect all audio chunks and return them as a single byte vector.
648 ///
649 /// This is a convenience method for non-streaming use cases where you
650 /// want to wait for all audio data before processing it.
651 ///
652 /// # Example
653 ///
654 /// ```rust,no_run
655 /// use mimo_api::Client;
656 ///
657 /// #[tokio::main]
658 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
659 /// let client = Client::from_env()?;
660 ///
661 /// let mut stream = client.tts_stream("Hello, world!").send().await?;
662 /// let audio_bytes = stream.collect_audio().await?;
663 ///
664 /// tokio::fs::write("output.pcm", &audio_bytes).await?;
665 /// println!("Total bytes: {}", audio_bytes.len());
666 ///
667 /// Ok(())
668 /// }
669 /// ```
670 pub async fn collect_audio(&mut self) -> Result<Vec<u8>> {
671 let mut all_bytes = Vec::new();
672
673 while let Some(chunk) = self.stream.next().await {
674 if let Some(audio_bytes) = self.process_chunk(chunk?)? {
675 all_bytes.extend(audio_bytes);
676 }
677 }
678
679 Ok(all_bytes)
680 }
681
682 /// Save all audio chunks to a file.
683 ///
684 /// This is a convenience method that collects all audio data and writes it to a file.
685 ///
686 /// # Example
687 ///
688 /// ```rust,no_run
689 /// use mimo_api::Client;
690 ///
691 /// #[tokio::main]
692 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
693 /// let client = Client::from_env()?;
694 ///
695 /// let mut stream: mimo_api::StreamingTtsResponse = client.tts_stream("Hello, world!").send().await?;
696 /// stream.save_to_file("output.pcm").await?;
697 ///
698 /// println!("Audio saved to file");
699 ///
700 /// Ok(())
701 /// }
702 /// ```
703 pub async fn save_to_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<()> {
704 use tokio::fs::File;
705 use tokio::io::AsyncWriteExt;
706
707 let mut file = File::create(path).await?;
708
709 while let Some(chunk) = self.stream.next().await {
710 if let Some(audio_bytes) = self.process_chunk(chunk?)? {
711 file.write_all(&audio_bytes).await?;
712 }
713 }
714
715 file.flush().await?;
716 Ok(())
717 }
718
719 /// Process a stream chunk and return audio bytes if present.
720 fn process_chunk(&mut self, chunk: StreamChunk) -> Result<Option<Vec<u8>>> {
721 if !chunk.choices.is_empty()
722 && let Some(audio) = &chunk.choices[0].delta.audio
723 {
724 let bytes = audio.decode_data()?;
725 self.total_bytes += bytes.len() as u64;
726 self.chunk_count += 1;
727 return Ok(Some(bytes));
728 }
729 Ok(None)
730 }
731
732 /// Get the total number of bytes received so far.
733 pub fn total_bytes(&self) -> u64 {
734 self.total_bytes
735 }
736
737 /// Get the number of audio chunks received so far.
738 pub fn chunk_count(&self) -> u32 {
739 self.chunk_count
740 }
741}
742
743impl futures::Stream for StreamingTtsResponse {
744 type Item = Result<Vec<u8>>;
745
746 fn poll_next(
747 mut self: std::pin::Pin<&mut Self>,
748 cx: &mut std::task::Context<'_>,
749 ) -> std::task::Poll<Option<Self::Item>> {
750 // Process chunks until we find one with audio data or the stream ends
751 loop {
752 match std::pin::Pin::new(&mut self.stream).poll_next(cx) {
753 std::task::Poll::Ready(Some(Ok(chunk))) => {
754 // Check if this is the final chunk with finish_reason
755 let is_final = chunk.choices.first().and_then(|c| c.finish_reason.as_ref()).is_some();
756
757 match self.process_chunk(chunk) {
758 Ok(Some(bytes)) => {
759 // Return audio data from this chunk
760 return std::task::Poll::Ready(Some(Ok(bytes)));
761 }
762 Ok(None) => {
763 // No audio data in this chunk
764 if is_final {
765 // Stream has ended, no more audio data
766 return std::task::Poll::Ready(None);
767 }
768 // Continue to next chunk
769 continue;
770 }
771 Err(e) => return std::task::Poll::Ready(Some(Err(e))),
772 }
773 }
774 std::task::Poll::Ready(Some(Err(e))) => {
775 let error_msg = format!("Stream error: {}", e);
776 return std::task::Poll::Ready(Some(Err(Error::StreamError(error_msg))));
777 }
778 std::task::Poll::Ready(None) => {
779 // Stream has ended normally
780 return std::task::Poll::Ready(None);
781 }
782 std::task::Poll::Pending => return std::task::Poll::Pending,
783 }
784 }
785 }
786}