use async_compat::CompatExt;
use futures_lite::io::AsyncWriteExt;
use lazy_static::lazy_static;
use rand::distributions::Alphanumeric;
use rand::thread_rng;
use rand::Rng;
use simple_log::info;
use ssh_wrap::wezterm_ssh::SessionBuilder;
use std::time::Instant;
use tokio::task;
use wezterm_ssh::Sftp;
const FILE_SIZE: usize = 1024 * 512;
lazy_static! {
static ref FOO: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(FILE_SIZE)
.map(char::from)
.collect();
}
const TASK_NUM: usize = 100;
const SINGLE_TASK_FILE: usize = 2;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
simple_log::quick!();
let _ = std::fs::remove_dir_all("./tmp/sftp/");
std::fs::create_dir_all("./tmp/sftp/").unwrap();
let session = SessionBuilder::new_with_pass("foo", "0.0.0.0", "123456", 2222)
.disable_userknown_hosts_file()
.connect_with_pass()
.await
.unwrap();
let sftp = session.sftp();
let now = Instant::now();
let mut vec = vec![];
for i in 0..TASK_NUM {
let handle = task::spawn(write(sftp.clone(), i));
vec.push(handle);
}
for handle in vec {
handle.await.unwrap()
}
info!("{:?}", now.elapsed());
let real_digest = sha256::digest(&*FOO);
let entries = std::fs::read_dir("./tmp/sftp").unwrap();
let mut number = 0;
for entry in entries {
let entry = entry?;
let file_name = entry.file_name();
let file_name = format!("./tmp/sftp/{}", file_name.to_string_lossy());
if entry.file_type()?.is_file() {
let digest = sha256::try_digest(file_name).unwrap();
assert_eq!(digest, real_digest);
number += 1;
}
}
assert_eq!(number, TASK_NUM * SINGLE_TASK_FILE);
Ok(())
}
async fn write(sftp: Sftp, task_index: usize) {
let data = FOO.as_bytes();
for i in 0..SINGLE_TASK_FILE {
let file_name = format!("/sftp_upload/file_{}_{}", task_index, i);
let mut file = sftp.create(&file_name).compat().await.unwrap();
let file_id = format!("{:?}", file);
info!("get file_id: {},file_name:{}", file_id, file_name);
file.write_all(data.as_ref()).compat().await.unwrap();
file.flush().await.unwrap();
info!("end drop file,file_id: {},file_name:{}", file_id, file_name);
}
}