use crate::common::define::{
AsyncResponseFn, BaseRequest, BaseResponse, BytesStream, HttpBuilder, HttpFn,
HttpStreamBuilder, RequestFn,
};
use bytes::Bytes;
use reqwest::{Method, Response};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ApiStorageUploadRequest {
#[serde(rename = "Path")]
pub path: Option<String>,
#[serde(rename = "Overwrite")]
pub overwrite: Option<bool>,
#[serde(rename = "Content")]
pub content: Option<Vec<u8>>,
}
impl ApiStorageUploadRequest {
pub fn new() -> Self {
Default::default()
}
pub fn with_path(mut self, path: String) -> Self {
self.path = Some(path);
self
}
pub fn with_content(mut self, content: Vec<u8>) -> Self {
self.content = Some(content);
self
}
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = Some(overwrite);
self
}
}
#[derive(derive_more::Debug, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct ApiStorageUploadStreamRequest {
#[serde(rename = "Path")]
pub path: Option<String>,
#[serde(rename = "Overwrite")]
pub overwrite: Option<bool>,
#[debug(skip)]
#[serde(skip)]
pub stream: Option<BytesStream>,
}
impl Clone for ApiStorageUploadStreamRequest {
fn clone(&self) -> Self {
Self {
path: self.path.clone(),
overwrite: self.overwrite,
stream: None,
}
}
}
impl ApiStorageUploadStreamRequest {
pub fn new() -> Self {
Self::default()
}
pub fn with_path(mut self, path: String) -> Self {
self.path = Some(path);
self
}
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = Some(overwrite);
self
}
pub fn with_stream(mut self, stream: BytesStream) -> Self {
self.stream = Some(stream);
self
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ApiStorageUploadResponse {}
impl HttpBuilder for ApiStorageUploadRequest {
type Response = BaseResponse<ApiStorageUploadResponse>;
fn builder(self) -> HttpFn<Self::Response> {
Box::new(move || {
let request_fn: RequestFn = Box::new(move || {
let mut queries = HashMap::new();
if let Some(path) = &self.path {
queries.insert("Path".to_string(), path.clone());
}
if let Some(overwrite) = self.overwrite {
queries.insert("Overwrite".to_string(), overwrite.to_string());
}
let mut body = Bytes::new();
if let Some(content) = &self.content {
body = Bytes::from(content.clone());
}
BaseRequest {
method: Method::POST,
uri: "/api/storage/upload/file".to_string(),
queries: Some(queries),
body,
..Default::default()
}
});
let response_fn: AsyncResponseFn<Self::Response> =
Box::new(|response: Response| Box::pin(async move { Ok(response.json().await?) }));
(request_fn, response_fn)
})
}
}
impl HttpStreamBuilder for ApiStorageUploadStreamRequest {
type Response = BaseResponse<ApiStorageUploadResponse>;
fn stream_builder(self) -> HttpFn<Self::Response> {
Box::new(move || {
let request_fn: RequestFn = Box::new(move || {
let mut queries = HashMap::new();
if let Some(path) = &self.path {
queries.insert("Path".to_string(), path.clone());
}
if let Some(overwrite) = self.overwrite {
queries.insert("Overwrite".to_string(), overwrite.to_string());
}
BaseRequest {
method: Method::POST,
uri: "/api/storage/upload/file".to_string(),
queries: Some(queries),
stream: self.stream,
..Default::default()
}
});
let response_fn: AsyncResponseFn<Self::Response> =
Box::new(|response: Response| Box::pin(async move { Ok(response.json().await?) }));
(request_fn, response_fn)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::client::OpenApiClient;
use crate::common::config::{EndpointType, OpenApiConfig};
use futures_util::StreamExt;
use futures_util::stream;
use tokio::fs::File;
use tokio_util::io::ReaderStream;
use tracing::info;
#[tokio::test]
async fn test_api_storage_upload() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
dotenvy::dotenv()?;
let config = OpenApiConfig::new().load_from_env()?;
let user_id = config.user_id.clone();
let mut client = OpenApiClient::new(config).with_endpoint_type(EndpointType::Cloud);
let http_fn = ApiStorageUploadRequest::new()
.with_path(format!("/{}/runner.py", user_id))
.with_content("print('hello world!')".as_bytes().to_vec())
.with_overwrite(true)
.builder();
let response = client.send(http_fn).await?;
info!("response: {:#?}", response);
Ok(())
}
#[tokio::test]
async fn test_api_storage_upload_bytes_stream() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
dotenvy::dotenv()?;
let config = OpenApiConfig::new().load_from_env()?;
let user_id = config.user_id.clone();
let mut client = OpenApiClient::new(config).with_endpoint_type(EndpointType::Cloud);
let http_fn = ApiStorageUploadStreamRequest::new()
.with_path(format!("/{}/runner.py", user_id))
.with_overwrite(true)
.with_stream(Box::pin(
stream::iter(
vec![
"Chunk 1: This is the first part of streaming data".to_string(),
"Chunk 2: This is the second part of streaming data".to_string(),
"Chunk 3: This is the final part of streaming data".to_string(),
]
.into_iter()
.map(|s| Bytes::from(s))
.collect::<Vec<_>>(),
)
.map(|chunk| Ok(chunk)),
))
.stream_builder();
let response = client.send(http_fn).await?;
info!("response: {:#?}", response);
Ok(())
}
#[tokio::test]
async fn test_api_storage_upload_file_stream() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
dotenvy::dotenv()?;
let config = OpenApiConfig::new().load_from_env()?;
let user_id = config.user_id.clone();
let mut client = OpenApiClient::new(config).with_endpoint_type(EndpointType::Cloud);
let tmp_file = "./tmp.txt";
let tmp_file_content = r#"This is a test file content for streaming.
It contains multiple lines of text, and each line will be sent as a separate chunk.
Each line will be sent as a separate chunk."#;
tokio::fs::write(tmp_file, tmp_file_content).await?;
let http_fn = ApiStorageUploadStreamRequest::new()
.with_path(format!("/{}/runner.py", user_id))
.with_overwrite(true)
.with_stream(Box::pin(
ReaderStream::new(File::open(&tmp_file).await?).map(|result| match result {
Ok(bytes) => Ok(bytes),
Err(_) => Ok(Bytes::new()),
}),
))
.stream_builder();
let response = client.send(http_fn).await?;
info!("response: {:#?}", response);
tokio::fs::remove_file(tmp_file).await?;
Ok(())
}
}