#![allow(non_snake_case, unused_imports, dead_code, non_camel_case_types, non_upper_case_globals)]
include!("escp/bindings.rs");
mod license;
extern crate clap;
extern crate flatbuffers;
pub use zstd_safe;
use clap::Parser;
use std::{env, process, thread, collections::HashMap};
use std::ffi::{CString, CStr};
use regex::Regex;
use subprocess::{Popen, PopenConfig, Redirection};
use std::{io, io::Read, io::Write, fs, slice, collections::VecDeque};
use std::os::unix::net::{UnixStream,UnixListener};
use std::os::fd::{AsRawFd, FromRawFd};
use log::{{debug, info, error}};
use std::mem::MaybeUninit;
#[allow(dead_code, unused_imports, clippy::all)]
mod file_spec;
mod session_init;
mod message;
macro_rules! sess_init {
($i:tt) => {
{
let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(128);
let s_init= session_init::Session_Init::create(
&mut builder,
&session_init::Session_InitArgs $i
);
builder.finish(s_init, None);
builder
}
}
}
pub mod logging;
pub mod receiver;
pub mod sender;
const msg_session_init:u16 = 8;
const msg_file_spec:u16 = 16;
const msg_file_stat:u16 = 17;
const msg_keepalive:u16 = 18;
const msg_message:u16 = 19;
const msg_compressed:u16 =128;
const msg_session_complete:u16 = 1;
const msg_session_terminate:u16 = 9;
const config_items: &[&str]= &[ "cpumask", "nodemask" ];
fn _print_type_of<T>(_: &T) {
println!("{}", std::any::type_name::<T>())
}
pub fn int_from_human ( str: String ) -> u64 {
let re = Regex::new(r"[a-zA-Z]").unwrap();
let mut divisor = 1;
let idx = match re.find(str.as_str()) {
Some(value) => { value.start() }
_ => { str.len() }
};
let (number,unit) = str.split_at(idx);
let val;
match number.parse::<u64>() {
Ok(value) => { val = value; }
Err(_) => { println!("Unable to parse {}", str ); return 0 }
}
let u = &unit.to_lowercase();
let unit_names = " kmgtpe";
let mut pow = 0;
if !u.is_empty() {
let (x,_y) = u.split_at(1);
match unit_names.find(x) {
Some(value) => { pow = value as u32; }
_ => { pow = 0; }
}
}
let mut multiplier: u64 = 1024;
if u.contains("bit") {
multiplier = 1000;
divisor = 8;
}
val * multiplier.pow(pow) / divisor
}
fn to_header ( sz: u32, typ: u16 ) -> Vec<u8> {
let input = [ (sz >> 16) as u16, (sz & 0xffff) as u16, typ ];
input.iter().flat_map(|val| val.to_be_bytes()).collect()
}
fn from_header( i: Vec<u8> ) -> ( u32, u16 ) {
((i[0] as u32) << 24 | (i[1] as u32) << 16 | (i[2] as u32) << 8 | (i[3] as u32), (i[4] as u16) << 8 | (i[5] as u16) )
}
unsafe impl Send for logging::dtn_args_wrapper {}
unsafe impl Sync for logging::dtn_args_wrapper {}
fn do_escp(args: *mut dtn_args, flags: &EScp_Args) {
let safe_args = logging::dtn_args_wrapper{ args };
if unsafe { (*args).do_server } {
receiver::escp_receiver( safe_args, flags );
} else {
sender::escp_sender( safe_args, flags );
}
}
fn fc_worker(fc_in: crossbeam_channel::Sender<(u64, u32)>,
fc2_in: crossbeam_channel::Sender<(u64, u64)>) {
loop {
unsafe {
let fc = fc_pop();
if fc.is_null() {
continue;
}
debug!("fc_worker: fn={} bytes={} blocks={} crc={:#X} complete={}",
(*fc).file_no, (*fc).bytes, (*fc).blocks, (*fc).crc, (*fc).completion);
if (*fc).file_no == 0 {
debug!("fc_worker: returning because file_no == 0");
return;
}
_ = fc_in.send(( (*fc).file_no, (*fc).crc ));
_ = fc2_in.send(( (*fc).file_no, (*fc).blocks) );
}
}
}
#[derive(Parser, Debug)]
#[command( author, long_version=logging::build::CLAP_LONG_VERSION, about, long_about = None )]
pub struct EScp_Args {
#[arg()]
source: Vec<String>,
#[arg(hide=true, long="dest", default_value_t=String::from(""))]
destination: String,
#[arg(short='P', long="port", hide_default_value=true, default_value_t = 22)]
ssh_port: u16,
#[arg(long="escp_port", default_value_t = 1232)]
escp_port: u16,
#[arg(long="escp_portrange", default_value_t = 10)]
escp_portrange: u16,
#[arg(short, long, num_args=0)]
verbose: bool,
#[arg(long="logfile", default_value_t=String::from(""), hide_default_value=true)]
log_file: String,
#[arg(short, long, num_args=0)]
quiet: bool,
#[arg(short, long, hide_default_value=true, default_value_t = String::from("0"))]
limit: String,
#[arg(short, long, num_args=0)]
preserve: bool,
#[arg(short='C', long="compress")]
compression: bool,
#[arg(long)]
sparse: bool,
#[arg(short='r', long, num_args=0)]
recursive: bool,
#[arg(short='o', hide_default_value=true, default_value_t=String::from(""))]
ssh_option: String,
#[arg(short='S', long="ssh", default_value_t=String::from("ssh"))]
ssh: String,
#[arg(short='D', long="escp", default_value_t=String::from("escp"))]
escp: String,
#[arg(long="blocksize", default_value_t = String::from("1M"))]
block_sz: String,
#[arg(long="ioengine", default_value_t = String::from("posix"),
help="posix,dummy")]
io_engine: String,
#[arg(short='t', long="parallel", default_value_t = 4 )]
threads: u32,
#[arg(long, hide=true, help="mgmt UDS/IPC connection", default_value_t=String::from(""))]
mgmt: String,
#[arg(long, help="Display speed in bits/s")]
bits: bool,
#[arg(long, help="Disable direct (O_DIRECT) mode")]
nodirect: bool,
#[arg(long, help="disable checksum")]
nochecksum: bool,
#[arg(short='A', long="agent")]
agent: bool,
#[arg(short, long, hide_default_value=true, default_value_t=String::from(""))]
cipher: String,
#[arg(short='i', long, hide_default_value=true, default_value_t=String::from(""))]
identity: String,
#[arg(short='F', long="ssh_config", default_value_t=String::from(""))]
ssh_config: String,
#[arg(short='J', long="jump_host", default_value_t=String::from(""))]
jump_host: String,
#[arg(short='B', hide=true )]
batch_mode: bool,
#[arg(short='L', long, help="Display License")]
license: bool,
#[arg(long, hide=true )]
server: bool,
#[arg(short='3', hide=true)]
three: bool,
#[arg(short, hide=true)]
s: bool,
#[arg(short='O', hide=true )]
O: bool,
#[arg(short='T', hide=true)]
disable_strict_filename: bool,
#[arg(short='4', hide=true)]
ipv4: bool,
#[arg(short='6', hide=true)]
ipv6: bool,
#[arg(short='R', hide=true)]
ssh_from_origin: bool,
}
fn load_yaml(file_str: &str) -> HashMap<String, String> {
let file_raw = std::fs::File::open(file_str);
let mut file;
let mut map = HashMap::new();
match file_raw {
Ok(value) => { file = value; }
Err(_) => { return map; }
}
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect("Unable to read file");
let docs = yaml_rust2::YamlLoader::load_from_str(
&contents).expect("Error parsing YAML File");
let doc = &docs[0];
for i in config_items {
let res;
match doc[*i].as_str() {
Some(value) => { res= value; }
_ => { continue }
}
map.insert(i.to_string(), res.to_string());
}
map
}
pub fn start_escp() {
let config = load_yaml("/etc/escp.conf");
let args =
unsafe {
args_new()
};
if config.contains_key("cpumask") {
unsafe{
(*args).do_affinity = true;
let res = hex::decode( config["cpumask"].as_str() ).expect("Bad cpumask");
let mut len = res.len();
if len > 31 {
len = 32;
}
std::ptr::copy_nonoverlapping(
res.as_ptr() as *mut i8,
(*args).cpumask_bytes.as_ptr() as *mut i8, len);
(*args).cpumask_len = len as i32;
}
}
if config.contains_key("nodemask") {
unsafe{
(*args).nodemask = u64::from_str_radix(
config["nodemask"].as_str(), 16).expect("Bad nodemask");
}
}
let io_engine_names = HashMap::from( [
("posix", 1),
("uring", 2),
("dummy", 3),
("shmem", 4),
]);
{
let mut flags = EScp_Args::parse();
let l = flags.source.len();
flags.destination=flags.source.last().unwrap_or(&String::new()).to_string();
if l >= 2 {
flags.source.truncate(l-1);
}
unsafe {
let io_engine = flags.io_engine.to_lowercase();
(*args).io_engine = io_engine_names.get(&io_engine.as_str()).cloned().unwrap_or(-1);
(*args).io_engine_name = io_engine.as_ptr() as *mut i8;
(*args).block = int_from_human(flags.block_sz.to_string()) as i32;
if (*args).block < 256*1024 {
eprintln!("setting block size to 256K, which is minimum block size");
(*args).block = 256*1024;
}
if (*args).io_engine == -1 {
eprintln!("io_engine='{}' not in compiled io_engines {:?}",
io_engine, io_engine_names.keys());
process::exit(0);
}
if flags.verbose {
if flags.log_file.is_empty() {
eprintln!("Warning: verbose logging enabled but no log file output.")
}
verbose_logging += 1;
}
if flags.compression { (*args).compression = 1; }
if flags.preserve { (*args).do_preserve= true; }
if flags.sparse {
if !flags.compression {
eprintln!("Warning: Sparse enabled without compression.")
}
(*args).sparse = 1;
}
if flags.verbose { verbose_logging += 1; }
if flags.quiet { verbose_logging = 0; }
(*args).nodirect = flags.nodirect;
(*args).do_hash = !flags.nochecksum;
if flags.recursive { (*args).recursive = true; }
(*args).pacing = int_from_human(flags.limit.clone());
(*args).window = 512*1024*1024;
(*args).mtu=8204;
(*args).thread_count = flags.threads as i32;
(*args).QD = 4;
(*args).do_server = flags.server;
(*args).active_port = flags.escp_port;
(*args).flags = libc::O_RDONLY;
}
if flags.license {
license::print_license();
process::exit(0);
};
if l < 2 && !flags.server {
eprintln!("Error: Not enough arguments\n");
use clap::CommandFactory;
let mut cmd = EScp_Args::command();
let _ = cmd.print_help();
process::exit(0);
}
do_escp( args, &flags );
};
}