use crate::error::{FetchError, Result, TypeError};
use bytes::Bytes;
use serde_json::Value;
#[derive(Debug, Clone)]
pub enum BodySource {
Empty,
Text(String),
Bytes(Bytes),
Json(Value),
}
#[derive(Debug, Clone)]
pub struct ReadableStream {
source: BodySource,
used: bool,
}
impl ReadableStream {
pub fn empty() -> Self {
Self {
source: BodySource::Empty,
used: false,
}
}
pub fn from_text(text: &str) -> Self {
Self {
source: BodySource::Text(text.to_string()),
used: false,
}
}
pub fn from_bytes(bytes: Bytes) -> Self {
Self {
source: BodySource::Bytes(bytes),
used: false,
}
}
pub fn from_json(value: &Value) -> Self {
Self {
source: BodySource::Json(value.clone()),
used: false,
}
}
pub fn locked(&self) -> bool {
false
}
pub async fn array_buffer(mut self) -> Result<Bytes> {
if self.used {
return Err(FetchError::Type(TypeError::new("Body already used")));
}
self.used = true;
match self.source {
BodySource::Empty => Ok(Bytes::new()),
BodySource::Text(text) => Ok(Bytes::from(text.into_bytes())),
BodySource::Bytes(bytes) => Ok(bytes),
BodySource::Json(value) => {
let vec = serde_json::to_vec(&value)?;
Ok(Bytes::from(vec))
}
}
}
pub async fn blob(self) -> Result<Bytes> {
self.array_buffer().await
}
pub async fn form_data(self) -> Result<String> {
self.text().await
}
pub async fn json<T: serde::de::DeserializeOwned>(mut self) -> Result<T> {
if self.used {
return Err(FetchError::Type(TypeError::new("Body already used")));
}
self.used = true;
match self.source {
BodySource::Empty => Err(FetchError::Type(TypeError::new(
"Unexpected end of JSON input",
))),
BodySource::Text(text) => Ok(serde_json::from_str(&text)?),
BodySource::Bytes(bytes) => Ok(serde_json::from_slice(&bytes)?),
BodySource::Json(value) => Ok(serde_json::from_value(value)?),
}
}
pub async fn text(mut self) -> Result<String> {
if self.used {
return Err(FetchError::Type(TypeError::new("Body already used")));
}
self.used = true;
match self.source {
BodySource::Empty => Ok(String::new()),
BodySource::Text(text) => Ok(text),
BodySource::Bytes(bytes) => String::from_utf8(bytes.to_vec())
.map_err(|_| FetchError::Type(TypeError::new("Invalid UTF-8"))),
BodySource::Json(value) => Ok(serde_json::to_string(&value)?),
}
}
pub(crate) fn get_content_type(&self) -> Option<&'static str> {
match self.source {
BodySource::Empty => None,
BodySource::Text(_) => Some("text/plain;charset=UTF-8"),
BodySource::Bytes(_) => None,
BodySource::Json(_) => Some("application/json"),
}
}
pub(crate) async fn to_bytes(&self) -> Result<Bytes> {
match &self.source {
BodySource::Empty => Ok(Bytes::new()),
BodySource::Text(text) => Ok(Bytes::from(text.as_bytes().to_vec())),
BodySource::Bytes(bytes) => Ok(bytes.clone()),
BodySource::Json(value) => {
let vec = serde_json::to_vec(value)?;
Ok(Bytes::from(vec))
}
}
}
pub(crate) fn is_used(&self) -> bool {
self.used
}
}
impl From<&str> for ReadableStream {
fn from(text: &str) -> Self {
Self::from_text(text)
}
}
impl From<String> for ReadableStream {
fn from(text: String) -> Self {
Self::from_text(&text)
}
}
impl From<Bytes> for ReadableStream {
fn from(bytes: Bytes) -> Self {
Self::from_bytes(bytes)
}
}
impl From<Vec<u8>> for ReadableStream {
fn from(bytes: Vec<u8>) -> Self {
Self::from_bytes(Bytes::from(bytes))
}
}
impl From<Value> for ReadableStream {
fn from(value: Value) -> Self {
Self::from_json(&value)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_readable_stream_text() {
let stream = ReadableStream::from_text("Hello, World!");
let text = stream.text().await.unwrap();
assert_eq!(text, "Hello, World!");
}
#[tokio::test]
async fn test_readable_stream_bytes() {
let data = vec![1, 2, 3, 4, 5];
let stream = ReadableStream::from_bytes(Bytes::from(data.clone()));
let bytes = stream.array_buffer().await.unwrap();
assert_eq!(bytes.to_vec(), data);
}
#[tokio::test]
async fn test_readable_stream_json() {
let value = serde_json::json!({"key": "value", "number": 42});
let stream = ReadableStream::from_json(&value);
let parsed: serde_json::Value = stream.json().await.unwrap();
assert_eq!(parsed["key"], "value");
assert_eq!(parsed["number"], 42);
}
#[tokio::test]
async fn test_readable_stream_empty() {
let stream = ReadableStream::empty();
let text = stream.text().await.unwrap();
assert_eq!(text, "");
}
#[tokio::test]
async fn test_readable_stream_blob() {
let data = vec![1, 2, 3, 4];
let stream = ReadableStream::from_bytes(Bytes::from(data.clone()));
let blob = stream.blob().await.unwrap();
assert_eq!(blob.to_vec(), data);
}
#[tokio::test]
async fn test_readable_stream_form_data() {
let stream = ReadableStream::from_text("form data");
let form = stream.form_data().await.unwrap();
assert_eq!(form, "form data");
}
#[test]
fn test_readable_stream_locked() {
let stream = ReadableStream::from_text("test");
assert!(!stream.locked());
}
#[test]
fn test_readable_stream_from_conversions() {
let _stream1 = ReadableStream::from("text");
let _stream2 = ReadableStream::from(String::from("text"));
let _stream3 = ReadableStream::from(Bytes::from(vec![1, 2, 3]));
let _stream4 = ReadableStream::from(vec![1, 2, 3]);
let _stream5 = ReadableStream::from(serde_json::json!({"key": "value"}));
}
#[test]
fn test_get_content_type() {
let empty = ReadableStream::empty();
assert_eq!(empty.get_content_type(), None);
let text = ReadableStream::from_text("hello");
assert_eq!(text.get_content_type(), Some("text/plain;charset=UTF-8"));
let bytes = ReadableStream::from_bytes(Bytes::from(vec![1, 2, 3]));
assert_eq!(bytes.get_content_type(), None);
let json = ReadableStream::from_json(&serde_json::json!({}));
assert_eq!(json.get_content_type(), Some("application/json"));
}
#[tokio::test]
async fn test_to_bytes() {
let text = ReadableStream::from_text("hello");
let bytes = text.to_bytes().await.unwrap();
assert_eq!(bytes, "hello".as_bytes());
let json = ReadableStream::from_json(&serde_json::json!({"key": "value"}));
let bytes = json.to_bytes().await.unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["key"], "value");
}
#[tokio::test]
async fn test_body_already_used_error() {
let stream = ReadableStream::from_text("test");
let _text = stream.text().await.unwrap();
let mut stream = ReadableStream::from_text("test");
stream.used = true;
let result = stream.text().await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), FetchError::Type(_)));
}
#[tokio::test]
async fn test_json_empty_body_error() {
let stream = ReadableStream::empty();
let result: Result<serde_json::Value> = stream.json().await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), FetchError::Type(_)));
}
#[tokio::test]
async fn test_invalid_utf8_error() {
let invalid_utf8 = vec![0xFF, 0xFE, 0xFD];
let stream = ReadableStream::from_bytes(Bytes::from(invalid_utf8));
let result = stream.text().await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), FetchError::Type(_)));
}
}