#![cfg(feature = "tokio")]
use std::{env, error, net::SocketAddr};
use async_ssh2_lite::{util::ConnectInfo, AsyncSession};
use futures_util::future::join_all;
#[cfg(not(feature = "_integration_tests_tokio_ext"))]
use futures_util::AsyncReadExt as _;
#[cfg(feature = "_integration_tests_tokio_ext")]
use tokio::io::AsyncReadExt as _;
use super::{
helpers::{get_connect_addr, get_listen_addr, is_internal_test_openssh_server},
session__userauth_pubkey::__run__session__userauth_pubkey_file,
};
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn simple_with_tokio() -> Result<(), Box<dyn error::Error>> {
let http_server_listen_addr = get_listen_addr();
let http_server_listen_addr_for_server = http_server_listen_addr;
let http_server_listen_addr_for_forwarding = SocketAddr::from((
if is_internal_test_openssh_server() {
[172, 17, 0, 1]
} else {
[127, 0, 0, 1]
},
http_server_listen_addr.port(),
));
let ssh_server_connect_addr = get_connect_addr()?;
let remote_port = http_server_listen_addr.port() + 1;
let server_task: tokio::task::JoinHandle<Result<(), Box<dyn error::Error + Send + Sync>>> =
tokio::task::spawn(async move {
use core::convert::Infallible;
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
if req.uri().path() == "/200" {
Ok(Response::new(Body::from("")))
} else {
let mut res = Response::new(Body::from(""));
*res.status_mut() = StatusCode::NOT_FOUND;
Ok(res)
}
}
let addr = http_server_listen_addr_for_server;
let make_service =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::bind(&addr).serve(make_service);
match server.await {
Ok(_) => {}
Err(err) => {
eprintln!("server error, err:{err}");
}
}
Ok(())
});
let mut session =
AsyncSession::<async_ssh2_lite::TokioTcpStream>::connect(ssh_server_connect_addr, None)
.await?;
__run__session__userauth_pubkey_file(&mut session).await?;
let forwarding_task: tokio::task::JoinHandle<Result<(), Box<dyn error::Error + Send + Sync>>> =
tokio::task::spawn(async move {
match session
.remote_port_forwarding(
remote_port,
None,
None,
ConnectInfo::Tcp(http_server_listen_addr_for_forwarding),
)
.await
{
Ok(_) => {}
Err(err) => {
eprintln!("session.remote_port_forwarding error, err:{err}");
}
}
Ok(())
});
tokio::time::sleep(tokio::time::Duration::from_millis(
if is_internal_test_openssh_server() {
500
} else {
env::var("REMOTE_PORT_FORWARDING_WAIT_SECS")
.as_deref()
.unwrap_or("4")
.parse::<u64>()
.unwrap_or(4)
* 1000
},
))
.await;
let futures = (1..=10)
.map(|i| {
async move {
let mut session = AsyncSession::<async_ssh2_lite::TokioTcpStream>::connect(ssh_server_connect_addr, None)
.await?;
__run__session__userauth_pubkey_file(&mut session).await?;
let mut channel = session.channel_session().await?;
channel
.exec(
format!(
r#"curl http://127.0.0.1:{remote_port}/200 -H "x-foo: bar" -v -w "%{{http_code}}""#,
)
.as_ref(),
)
.await?;
let mut s = String::new();
channel.read_to_string(&mut s).await?;
println!("remote_port_forwarding exec curl output:{s} i:{i}");
assert_eq!(s, "200");
channel.close().await?;
println!("remote_port_forwarding exec curl exit_status:{} i:{i}", channel.exit_status()?);
Result::<_, Box<dyn error::Error>>::Ok(())
}
})
.collect::<Vec<_>>();
let rets = join_all(futures).await;
println!("remote_port_forwarding exec curl rets:{rets:?}");
assert!(rets.iter().all(|x| x.is_ok()));
server_task.abort();
assert!(server_task.await.unwrap_err().is_cancelled());
forwarding_task.abort();
assert!(forwarding_task.await.unwrap_err().is_cancelled());
Ok(())
}