use crate::action_factory::{ActionTrait, ActionData};
use log::*;
use simple_error::*;
use crate::runtime;
use std::sync::{Mutex,Arc};
use std::time::Duration;
use tungstenite::{connect, client, protocol::WebSocketConfig, Error};
use url::Url;
use tungstenite::client::{IntoClientRequest, AutoStream };
use tungstenite::stream::*;
use tungstenite::error::{UrlError, TlsError,};
use std::net::{ToSocketAddrs, SocketAddr, TcpStream};
use tungstenite::http::Uri;
use native_tls::{TlsConnector,HandshakeError as TlsHandshakeError};
pub struct ActionWebsocket {
task:ActionData
}
pub use tungstenite::stream::Stream as StreamSwitcher;
fn wrap_stream(stream: TcpStream, domain: &str, mode: Mode) -> Result<AutoStream, tungstenite::Error> {
match mode {
Mode::Plain => Ok(StreamSwitcher::Plain(stream)),
Mode::Tls => {
let connector = TlsConnector::builder().build().map_err(TlsError::Native)?;
connector
.connect(domain, stream)
.map_err(|e| match e {
TlsHandshakeError::Failure(f) => TlsError::Native(f).into(),
TlsHandshakeError::WouldBlock(_) => {
panic!("Bug: TLS handshake not blocked")
}
})
.map(StreamSwitcher::Tls)
}
}
}
fn connect_to_some_timeout(addrs: &[SocketAddr], uri: &Uri, mode: Mode,timeout:Duration) -> Result<AutoStream, tungstenite::Error> {
let domain = uri.host().ok_or(Error::Url(UrlError::NoHostName))?;
for addr in addrs {
info!("Trying to contact {} at {}...", uri, addr);
if let Ok(raw_stream) = TcpStream::connect_timeout(addr,timeout) {
if let Ok(stream) = wrap_stream(raw_stream, domain, mode) {
return Ok(stream);
}
}
}
Err(Error::Url(UrlError::UnableToConnect(uri.to_string())))
}
impl ActionWebsocket {
fn executor_tungstenite(&mut self) {
let task_conf = &self.task.task_action;
let result_data = Arc::new(Mutex::new( String::new()));
let async_rd = result_data.clone();
let startTime = chrono::Utc::now().time();
let url = Url::parse(task_conf.target.as_str()).unwrap();
let requestUri = url.into_client_request().unwrap();
let mode = client::uri_mode(requestUri.uri()).unwrap();
let host = requestUri.uri().host().ok_or(Error::Url(UrlError::NoHostName)).unwrap();
let port = requestUri.uri().port_u16().unwrap_or(match mode {
Mode::Plain => 80,
Mode::Tls => 443,
});
let addrs = (host, port).to_socket_addrs().unwrap();
let mut r = connect_to_some_timeout(
addrs.as_slice(), requestUri.uri(), mode,
Duration::from_millis(task_conf.timeout as u64));
match r {
Ok(stream) => {
let url = Url::parse(task_conf.target.as_str()).unwrap();
let cli = client::client(url,stream);
match cli {
Ok(e) => {
if task_conf.output_result {
info!("websocket {} connect success,time:{}ms...",task_conf.target,(chrono::Utc::now().time() - startTime).num_milliseconds());
}
self.task.do_recover();
}
Err(e) => {
error!("connect websocket {} ,error: {:?}",task_conf.target,e);
}
}
}
Err(e) => {
error!("connect websocket {} ,error: {:?}",task_conf.target,e);
self.task.do_failed();
}
}
}
}
impl ActionTrait for ActionWebsocket {
fn id(&self) -> i32 {
self.task.id
}
fn name(&self) -> &str {
self.task.name.as_str()
}
fn append_data(&mut self, data: ActionData) {
self.task = data;
}
fn executor(&mut self) {
info!("start websocket action:{} id:{}...",self.task.id,self.task.name);
self.executor_tungstenite();
}
}
pub fn new_websocket(data: ActionData) -> ActionWebsocket {
ActionWebsocket { task:data }
}