use std::thread;
use std::pin::Pin;
use std::sync::Arc;
use std::path::PathBuf;
use std::time::Duration;
use std::future::Future;
use std::result::Result as GenResult;
use std::task::{Context, Poll, Waker};
use std::io::{Error, Result, ErrorKind};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use bytes::Buf;
use futures::{future::{FutureExt, LocalBoxFuture}};
use flume::{Sender, Receiver, TrySendError, bounded};
use tcp::{Socket, SocketHandle,
utils::Ready};
pub const DEFAULT_SUPPORT_HTTP_VERSION: u8 = 1;
pub const DEFAULT_SUPPORT_HTTP_VERSION_STR: &str = "1.1";
pub const DEFAULT_HTTP_SCHEME: &str = "http";
pub const DEFAULT_HTTPS_SCHEME: &str = "https";
pub const DEFAULT_HTTP_PORT: u16 = 80;
pub const DEFAULT_HTTPS_PORT: u16 = 443;
pub const DEAFULT_CHUNK_SIZE: u64 = 512 * 1024;
pub struct HttpSender<T: Send + Sync + 'static> {
sender: Sender<Option<T>>, }
unsafe impl<T: Send + Sync + 'static> Send for HttpSender<T> {}
unsafe impl<T: Send + Sync + 'static> Sync for HttpSender<T> {}
impl<T: Send + Sync + 'static> Clone for HttpSender<T> {
fn clone(&self) -> Self {
HttpSender {
sender: self.sender.clone(),
}
}
}
impl<T: Send + Sync + 'static> HttpSender<T> {
pub async fn send(&self, msg: Option<T>) -> Result<()> {
if let Err(e) = self.sender.send_async(msg).await {
Err(Error::new(ErrorKind::InvalidInput,
format!("Http channel send failed, reason: {:?}",
e)))
} else {
Ok(())
}
}
pub fn try_send(&self, msg: Option<T>) -> Result<()> {
match self.sender.try_send(msg) {
Ok(_) => Ok(()),
Err(TrySendError::Full(_)) => {
Err(Error::new(ErrorKind::WouldBlock,
"Http channel try send failed, reason: channel full"))
},
Err(TrySendError::Disconnected(_)) => {
Err(Error::new(ErrorKind::BrokenPipe,
"Http channel try send failed, reason: channel disconnected"))
},
}
}
}
pub enum HttpRecvResult<T> {
Err(Error), Ok(T), Fin(T), }
pub struct HttpReceiver<T: Send + Sync + 'static> {
receiver: Receiver<Option<T>>, }
unsafe impl<T: Send + Sync + 'static> Send for HttpReceiver<T> {}
unsafe impl<T: Send + Sync + 'static> Sync for HttpReceiver<T> {}
impl<T: Send + Sync + 'static> HttpReceiver<T> {
pub async fn recv(&self) -> HttpRecvResult<Vec<T>> {
let mut buf = Vec::new();
loop {
match self.receiver.recv_async().await {
Err(e) => {
return HttpRecvResult::Err(Error::new(ErrorKind::BrokenPipe,
format!("Async recv http response failed, reason: {:?}",
e)));
},
Ok(None) => {
return HttpRecvResult::Fin(buf);
},
Ok(Some(data)) => {
buf.push(data);
},
}
}
}
pub async fn next(&self) -> HttpRecvResult<Option<T>> {
match self.receiver.recv_async().await {
Err(e) => {
HttpRecvResult::Err(Error::new(ErrorKind::BrokenPipe,
format!("Async recv http response failed, reason: {:?}",
e)))
},
Ok(None) => {
HttpRecvResult::Fin(None)
},
Ok(item) => {
HttpRecvResult::Ok(item)
},
}
}
}
#[derive(Debug, Clone)]
pub enum ContentEncode {
Emtpy, Deflate(u32), Gzip(u32), Br(u32), }
pub fn channel<T>(size: usize) -> (HttpSender<T>, HttpReceiver<T>)
where T: Send + Sync + 'static {
let (sender, receiver) = bounded(size);
(HttpSender {
sender,
},
HttpReceiver {
receiver,
})
}
pub fn trim_path<P: Into<PathBuf>>(path: P) -> Result<PathBuf> {
let input: PathBuf = path.into();
let mut path = PathBuf::new();
for e in input.iter() {
path = path.join(e);
}
Ok(path)
}