use IPC::Open3;
use POSIX qw(:sys_wait_h setsid ceil :errno_h);
use Symbol qw(gensym);
use File::Temp qw(tempfile tempdir);
use File::Path;
use Getopt::Long;
use strict;
use File::Basename;
if(not $ENV{HOME}) {
::warning("\$HOME not set. Using /tmp\n");
$ENV{HOME} = "/tmp";
}
save_stdin_stdout_stderr();
save_original_signal_handler();
parse_options();
::debug("init", "Open file descriptors: ", join(" ",keys %Global::fd), "\n");
my $number_of_args;
if($Global::max_number_of_args) {
$number_of_args=$Global::max_number_of_args;
} elsif ($opt::X or $opt::m or $opt::xargs) {
$number_of_args = undef;
} else {
$number_of_args = 1;
}
my @command;
@command = @ARGV;
my @fhlist;
if($opt::pipepart) {
@fhlist = map { open_or_exit($_) } "/dev/null";
} else {
@fhlist = map { open_or_exit($_) } @opt::a;
if(not @fhlist and not $opt::pipe) {
@fhlist = (*STDIN);
}
}
if($opt::skip_first_line) {
my $fh = $fhlist[0];
<$fh>;
}
if($opt::header and not $opt::pipe) {
my $fh = $fhlist[0];
my $delimiter = $opt::colsep;
$delimiter ||= "\$";
my $id = 1;
for my $fh (@fhlist) {
my $line = <$fh>;
chomp($line);
::debug("init", "Delimiter: '$delimiter'");
for my $s (split /$delimiter/o, $line) {
::debug("init", "Colname: '$s'");
for(@command) {
s:\{$s(|/|//|\.|/\.)\}:\{$id$1\}:g;
}
$Global::input_source_header{$id} = $s;
$id++;
}
}
} else {
my $id = 1;
for my $fh (@fhlist) {
$Global::input_source_header{$id} = $id;
$id++;
}
}
if($opt::filter_hosts and (@opt::sshlogin or @opt::sshloginfile)) {
filter_hosts();
}
if($opt::nonall or $opt::onall) {
onall(@command);
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
}
$Global::JobQueue = JobQueue->new(
\@command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files);
if($opt::eta or $opt::bar) {
$Global::JobQueue->total_jobs();
}
if($opt::pipepart) {
@Global::cat_partials = map { pipe_part_files($_) } @opt::a;
$Global::JobQueue->{'commandlinequeue'}->unget(
map { $Global::JobQueue->{'commandlinequeue'}->get() } @Global::cat_partials
);
}
for my $sshlogin (values %Global::host) {
$sshlogin->max_jobs_running();
}
init_run_jobs();
my $sem;
if($Global::semaphore) {
$sem = acquire_semaphore();
}
$SIG{TERM} = \&start_no_new_jobs;
start_more_jobs();
if(not $opt::pipepart) {
if($opt::pipe) {
spreadstdin();
}
}
::debug("init", "Start draining\n");
drain_job_queue();
::debug("init", "Done draining\n");
reaper();
::debug("init", "Done reaping\n");
if($opt::pipe and @opt::a) {
for my $job (@Global::tee_jobs) {
unlink $job->fh(2,"name");
$job->set_fh(2,"name","");
$job->print();
unlink $job->fh(1,"name");
}
}
::debug("init", "Cleaning\n");
cleanup();
if($Global::semaphore) {
$sem->release();
}
for(keys %Global::sshmaster) {
kill "TERM", $_;
}
::debug("init", "Halt\n");
if($opt::halt_on_error) {
wait_and_exit($Global::halt_on_error_exitstatus);
} else {
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
}
sub __PIPE_MODE__ {}
sub pipe_part_files {
my ($file) = @_;
my $buf = "";
my $header = find_header(\$buf,open_or_exit($file));
my @pos = find_split_positions($file,$opt::blocksize,length $header);
my @cat_partials = ();
for(my $i=0; $i<$#pos; $i++) {
push @cat_partials, cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1]);
}
return @cat_partials;
}
sub find_header {
my ($buf_ref, $fh) = @_;
my $header = "";
if($opt::header) {
if($opt::header eq ":") { $opt::header = "(.*\n)"; }
$opt::header =~ s/^(\d+)$/"(.*\n)"x$1/e;
while(read($fh,substr($$buf_ref,length $$buf_ref,0),$opt::blocksize)) {
if($$buf_ref=~s/^($opt::header)//) {
$header = $1;
last;
}
}
}
return $header;
}
sub find_split_positions {
my($file, $block, $headerlen) = @_;
my $size = -s $file;
$block = int $block;
my $dd_block_size = 131072; my @pos;
my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart;
my $fh = ::open_or_exit($file);
push(@pos,$headerlen);
for(my $pos = $block+$headerlen; $pos < $size; $pos += $block) {
my $buf;
seek($fh, $pos, 0) || die;
while(read($fh,substr($buf,length $buf,0),$dd_block_size)) {
if($opt::regexp) {
if($buf =~ /(.*$recend)$recstart/os) {
my $i = length($1);
push(@pos,$pos+$i);
$pos += $i;
last;
}
} else {
my $i = index($buf,$recendrecstart);
if($i != -1) {
push(@pos,$pos+$i);
$pos += $i;
last;
}
}
}
}
push(@pos,$size);
close $fh;
return @pos;
}
sub cat_partial {
my($file, @start_end) = @_;
my($start, $i);
my @start_len = map { if(++$i % 2) { $start = $_; } else { $_-$start } } @start_end;
return "<". shell_quote_scalar($file) .
q{ perl -e 'while(@ARGV) { sysseek(STDIN,shift,0) || die; $left = shift; while($read = sysread(STDIN,$buf, ($left > 32768 ? 32768 : $left))){ $left -= $read; syswrite(STDOUT,$buf); } }' } .
" @start_len";
}
sub spreadstdin {
my $buf = "";
my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart;
my $chunk_number = 1;
my $one_time_through;
my $blocksize = $opt::blocksize;
my $in = *STDIN;
my $header = find_header(\$buf,$in);
while(1) {
my $anything_written = 0;
if(not read($in,substr($buf,length $buf,0),$blocksize)) {
$chunk_number != 1 and last;
$one_time_through++ and last;
}
if($opt::r) {
$buf =~ s/^\s*\n//gm;
if(length $buf == 0) {
next;
}
}
if($Global::max_lines and not $Global::max_number_of_args) {
my $n_lines = $buf =~ tr/\n/\n/;
my $last_newline_pos = rindex($buf,"\n");
while($n_lines % $Global::max_lines) {
$n_lines--;
$last_newline_pos = rindex($buf,"\n",$last_newline_pos-1);
}
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$buf,
$recstart,$recend,$last_newline_pos+1);
substr($buf,0,$last_newline_pos+1) = "";
} elsif($opt::regexp) {
if($Global::max_number_of_args) {
my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while($buf =~ s/((?:$recstart.*?$recend){$read_n_lines})($recstart.*)$/$2/os) {
my $b = $1;
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$b,
$recstart,$recend,length $1);
}
} else {
if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
my $b = $1;
$anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$b,
$recstart,$recend,length $1);
}
}
} else {
if($Global::max_number_of_args) {
my $i = 0;
my $read_n_lines = $Global::max_number_of_args * ($Global::max_lines || 1);
while(($i = nindex(\$buf,$recendrecstart,$read_n_lines)) != -1) {
$i += length $recend; $anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$buf,
$recstart,$recend,$i);
substr($buf,0,$i) = "";
}
} else {
my $i = rindex($buf,$recendrecstart);
if($i != -1) {
$i += length $recend; $anything_written +=
write_record_to_pipe($chunk_number++,\$header,\$buf,
$recstart,$recend,$i);
substr($buf,0,$i) = "";
}
}
}
if(not $anything_written and not eof($in)) {
my $old_blocksize = $blocksize;
$blocksize = ceil($blocksize * 1.3 + 1);
::warning("A record was longer than $old_blocksize. " .
"Increasing to --blocksize $blocksize\n");
}
}
::debug("init", "Done reading input\n");
substr($buf,0,0) = "";
write_record_to_pipe($chunk_number++,\$header,\$buf,$recstart,$recend,length $buf);
$Global::start_no_new_jobs ||= 1;
if($opt::roundrobin) {
for my $job (values %Global::running) {
close $job->fh(0,"w");
}
my %incomplete_jobs = %Global::running;
my $sleep = 1;
while(keys %incomplete_jobs) {
my $something_written = 0;
for my $pid (keys %incomplete_jobs) {
my $job = $incomplete_jobs{$pid};
if($job->stdin_buffer_length()) {
$something_written += $job->non_block_write();
} else {
delete $incomplete_jobs{$pid}
}
}
if($something_written) {
$sleep = $sleep/2+0.001;
}
$sleep = ::reap_usleep($sleep);
}
}
}
sub recstartrecend {
my($recstart,$recend);
if(defined($opt::recstart) and defined($opt::recend)) {
$recstart = $opt::recstart;
$recend = $opt::recend;
} elsif(defined($opt::recstart)) {
$recstart = $opt::recstart;
$recend = "";
} elsif(defined($opt::recend)) {
$recstart = "";
$recend = $opt::recend;
}
if($opt::regexp) {
$recstart = "(?:".$recstart.")";
$recend = "(?:".$recend.")";
} else {
$recstart =~ s/\\([0rnt\'\"\\])/"qq|\\$1|"/gee;
$recend =~ s/\\([0rnt\'\"\\])/"qq|\\$1|"/gee;
}
return ($recstart,$recend);
}
sub nindex {
my ($buf_ref, $str, $n) = @_;
my $i = 0;
for(1..$n) {
$i = index($$buf_ref,$str,$i+1);
if($i == -1) { last }
}
return $i;
}
{
my @robin_queue;
sub round_robin_write {
my ($header_ref,$block_ref,$recstart,$recend,$endpos) = @_;
my $something_written = 0;
my $block_passed = 0;
my $sleep = 1;
while(not $block_passed) {
if(not @robin_queue) {
push @robin_queue, values %Global::running;
}
while(my $job = shift @robin_queue) {
if($job->stdin_buffer_length() > 0) {
$something_written += $job->non_block_write();
} else {
$job->set_stdin_buffer($header_ref,$block_ref,$endpos,$recstart,$recend);
$block_passed = 1;
$job->set_virgin(0);
$something_written += $job->non_block_write();
last;
}
}
$sleep = ::reap_usleep($sleep);
}
return $something_written;
}
}
sub write_record_to_pipe {
my ($chunk_number,$header_ref,$record_ref,$recstart,$recend,$endpos) = @_;
if($endpos == 0) { return 0; }
if(vec($Global::job_already_run,$chunk_number,1)) { return 1; }
if($opt::roundrobin) {
return round_robin_write($header_ref,$record_ref,$recstart,$recend,$endpos);
}
my $sleep = 0.0001; while(not @Global::virgin_jobs) {
::debug("pipe", "No virgin jobs");
$sleep = ::reap_usleep($sleep);
start_more_jobs();
}
my $job = shift @Global::virgin_jobs;
$job->set_virgin(0);
if(fork()) {
} else {
substr($$record_ref,$endpos,length $$record_ref) = "";
if($opt::remove_rec_sep) {
Job::remove_rec_sep($record_ref,$recstart,$recend);
}
$job->write($header_ref);
$job->write($record_ref);
close $job->fh(0,"w");
exit(0);
}
close $job->fh(0,"w");
return 1;
}
sub __SEM_MODE__ {}
sub acquire_semaphore {
$Global::host{':'} = SSHLogin->new(":");
my $sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
$sem->acquire();
if($Semaphore::fg) {
} else {
$sem->release();
if(fork()) {
exit(0);
} else {
::die_bug("Can't start a new session: $!") if setsid() == -1;
$sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
$sem->acquire();
}
}
return $sem;
}
sub __PARSE_OPTIONS__ {}
sub options_hash {
return
("debug|D=s" => \$opt::D,
"xargs" => \$opt::xargs,
"m" => \$opt::m,
"X" => \$opt::X,
"v" => \@opt::v,
"joblog=s" => \$opt::joblog,
"results|result|res=s" => \$opt::results,
"resume" => \$opt::resume,
"resume-failed|resumefailed" => \$opt::resume_failed,
"silent" => \$opt::silent,
"keep-order|keeporder|k" => \$opt::keeporder,
"group" => \$opt::group,
"g" => \$opt::retired,
"ungroup|u" => \$opt::ungroup,
"linebuffer|linebuffered|line-buffer|line-buffered" => \$opt::linebuffer,
"tmux" => \$opt::tmux,
"null|0" => \$opt::0,
"quote|q" => \$opt::q,
"parens=s" => \$opt::parens,
"rpl=s" => \@opt::rpl,
"plus" => \$opt::plus,
"I=s" => \$opt::I,
"extensionreplace|er=s" => \$opt::U,
"U=s" => \$opt::retired,
"basenamereplace|bnr=s" => \$opt::basenamereplace,
"dirnamereplace|dnr=s" => \$opt::dirnamereplace,
"basenameextensionreplace|bner=s" => \$opt::basenameextensionreplace,
"seqreplace=s" => \$opt::seqreplace,
"slotreplace=s" => \$opt::slotreplace,
"jobs|j=s" => \$opt::jobs,
"delay=f" => \$opt::delay,
"sshdelay=f" => \$opt::sshdelay,
"load=s" => \$opt::load,
"noswap" => \$opt::noswap,
"max-line-length-allowed" => \$opt::max_line_length_allowed,
"number-of-cpus" => \$opt::number_of_cpus,
"number-of-cores" => \$opt::number_of_cores,
"use-cpus-instead-of-cores" => \$opt::use_cpus_instead_of_cores,
"shellquote|shell_quote|shell-quote" => \$opt::shellquote,
"nice=i" => \$opt::nice,
"timeout=s" => \$opt::timeout,
"tag" => \$opt::tag,
"tagstring|tag-string=s" => \$opt::tagstring,
"onall" => \$opt::onall,
"nonall" => \$opt::nonall,
"filter-hosts|filterhosts|filter-host" => \$opt::filter_hosts,
"sshlogin|S=s" => \@opt::sshlogin,
"sshloginfile|slf=s" => \@opt::sshloginfile,
"controlmaster|M" => \$opt::controlmaster,
"return=s" => \@opt::return,
"trc=s" => \@opt::trc,
"transfer" => \$opt::transfer,
"cleanup" => \$opt::cleanup,
"basefile|bf=s" => \@opt::basefile,
"B=s" => \$opt::retired,
"ctrlc|ctrl-c" => \$opt::ctrlc,
"noctrlc|no-ctrlc|no-ctrl-c" => \$opt::noctrlc,
"workdir|work-dir|wd=s" => \$opt::workdir,
"W=s" => \$opt::retired,
"tmpdir=s" => \$opt::tmpdir,
"tempdir=s" => \$opt::tmpdir,
"use-compress-program|compress-program=s" => \$opt::compress_program,
"use-decompress-program|decompress-program=s" => \$opt::decompress_program,
"compress" => \$opt::compress,
"tty" => \$opt::tty,
"T" => \$opt::retired,
"halt-on-error|halt=s" => \$opt::halt_on_error,
"H=i" => \$opt::retired,
"retries=i" => \$opt::retries,
"dry-run|dryrun" => \$opt::dryrun,
"progress" => \$opt::progress,
"eta" => \$opt::eta,
"bar" => \$opt::bar,
"arg-sep|argsep=s" => \$opt::arg_sep,
"arg-file-sep|argfilesep=s" => \$opt::arg_file_sep,
"trim=s" => \$opt::trim,
"env=s" => \@opt::env,
"recordenv|record-env" => \$opt::record_env,
"plain" => \$opt::plain,
"profile|J=s" => \@opt::profile,
"pipe|spreadstdin" => \$opt::pipe,
"robin|round-robin|roundrobin" => \$opt::roundrobin,
"recstart=s" => \$opt::recstart,
"recend=s" => \$opt::recend,
"regexp|regex" => \$opt::regexp,
"remove-rec-sep|removerecsep|rrs" => \$opt::remove_rec_sep,
"files|output-as-files|outputasfiles" => \$opt::files,
"block|block-size|blocksize=s" => \$opt::blocksize,
"tollef" => \$opt::retired,
"gnu" => \$opt::gnu,
"xapply" => \$opt::xapply,
"bibtex" => \$opt::bibtex,
"nn|nonotice|no-notice" => \$opt::no_notice,
"max-procs|P=s" => \$opt::jobs,
"delimiter|d=s" => \$opt::d,
"max-chars|s=i" => \$opt::max_chars,
"arg-file|a=s" => \@opt::a,
"no-run-if-empty|r" => \$opt::r,
"replace|i:s" => \$opt::i,
"E=s" => \$opt::eof,
"eof|e:s" => \$opt::eof,
"max-args|n=i" => \$opt::max_args,
"max-replace-args|N=i" => \$opt::max_replace_args,
"colsep|col-sep|C=s" => \$opt::colsep,
"help|h" => \$opt::help,
"L=f" => \$opt::L,
"max-lines|l:f" => \$opt::max_lines,
"interactive|p" => \$opt::p,
"verbose|t" => \$opt::verbose,
"version|V" => \$opt::version,
"minversion|min-version=i" => \$opt::minversion,
"show-limits|showlimits" => \$opt::show_limits,
"exit|x" => \$opt::x,
"semaphore" => \$opt::semaphore,
"semaphoretimeout=i" => \$opt::semaphoretimeout,
"semaphorename|id=s" => \$opt::semaphorename,
"fg" => \$opt::fg,
"bg" => \$opt::bg,
"wait" => \$opt::wait,
"shebang|hashbang" => \$opt::shebang,
"internal-pipe-means-argfiles" => \$opt::internal_pipe_means_argfiles,
"Y" => \$opt::retired,
"skip-first-line" => \$opt::skip_first_line,
"header=s" => \$opt::header,
"cat" => \$opt::cat,
"fifo" => \$opt::fifo,
"pipepart|pipe-part" => \$opt::pipepart,
"hgrp|hostgroup|hostgroups" => \$opt::hostgroups,
);
}
sub get_options_from_array {
my ($array_ref, @keep_only) = @_;
if(not @$array_ref) {
return 1;
}
my @save_argv;
my $this_is_ARGV = (\@::ARGV == $array_ref);
if(not $this_is_ARGV) {
@save_argv = @::ARGV;
@::ARGV = @{$array_ref};
}
my %options = options_hash();
if(@keep_only) {
my (%keep,@dummy);
@keep{@keep_only} = @keep_only;
for my $k (grep { not $keep{$_} } keys %options) {
$options{$k} = \@dummy;
}
}
my $retval = GetOptions(%options);
if(not $this_is_ARGV) {
@{$array_ref} = @::ARGV;
@::ARGV = @save_argv;
}
return $retval;
}
sub parse_options {
$Global::version = 20141122;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
$Global::verbose = 0;
$Global::quoting = 0;
%Global::replace =
(
'{}' => '',
'{#}' => '1 $_=$job->seq()',
'{%}' => '1 $_=$job->slot()',
'{/}' => 's:.*/::',
'{//}' => '$Global::use{"File::Basename"} ||= eval "use File::Basename; 1;"; $_ = dirname($_);',
'{/.}' => 's:.*/::; s:\.[^/.]+$::;',
'{.}' => 's:\.[^/.]+$::',
);
%Global::plus =
(
'{+/}' => 's:/[^/]*$::',
'{+.}' => 's:.*\.::',
'{+..}' => 's:.*\.([^.]*\.):$1:',
'{+...}' => 's:.*\.([^.]*\.[^.]*\.):$1:',
'{..}' => 's:\.[^/.]+$::; s:\.[^/.]+$::',
'{...}' => 's:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::',
'{/..}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::',
'{/...}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::',
);
%Global::rpl = %Global::replace;
$Global::parens = "{==}";
$/="\n";
$Global::ignore_empty = 0;
$Global::interactive = 0;
$Global::stderr_verbose = 0;
$Global::default_simultaneous_sshlogins = 9;
$Global::exitstatus = 0;
$Global::halt_on_error_exitstatus = 0;
$Global::arg_sep = ":::";
$Global::arg_file_sep = "::::";
$Global::trim = 'n';
$Global::max_jobs_running = 0;
$Global::job_already_run = '';
$ENV{'TMPDIR'} ||= "/tmp";
@ARGV=read_options();
if(@opt::v) { $Global::verbose = $#opt::v+1; } $Global::debug = $opt::D;
$Global::shell = $ENV{'PARALLEL_SHELL'} || parent_shell($$) || $ENV{'SHELL'} || "/bin/sh";
if(defined $opt::X) { $Global::ContextReplace = 1; }
if(defined $opt::silent) { $Global::verbose = 0; }
if(defined $opt::0) { $/ = "\0"; }
if(defined $opt::d) { my $e="sprintf \"$opt::d\""; $/ = eval $e; }
if(defined $opt::p) { $Global::interactive = $opt::p; }
if(defined $opt::q) { $Global::quoting = 1; }
if(defined $opt::r) { $Global::ignore_empty = 1; }
if(defined $opt::verbose) { $Global::stderr_verbose = 1; }
sub rpl {
my ($old,$new) = @_;
if($old ne $new) {
$Global::rpl{$new} = $Global::rpl{$old};
delete $Global::rpl{$old};
}
}
if(defined $opt::parens) { $Global::parens = $opt::parens; }
my $parenslen = 0.5*length $Global::parens;
$Global::parensleft = substr($Global::parens,0,$parenslen);
$Global::parensright = substr($Global::parens,$parenslen);
if(defined $opt::plus) { %Global::rpl = (%Global::plus,%Global::rpl); }
if(defined $opt::I) { rpl('{}',$opt::I); }
if(defined $opt::U) { rpl('{.}',$opt::U); }
if(defined $opt::i and $opt::i) { rpl('{}',$opt::i); }
if(defined $opt::basenamereplace) { rpl('{/}',$opt::basenamereplace); }
if(defined $opt::dirnamereplace) { rpl('{//}',$opt::dirnamereplace); }
if(defined $opt::seqreplace) { rpl('{#}',$opt::seqreplace); }
if(defined $opt::slotreplace) { rpl('{%}',$opt::slotreplace); }
if(defined $opt::basenameextensionreplace) {
rpl('{/.}',$opt::basenameextensionreplace);
}
for(@opt::rpl) {
my ($shorthand,$long) = split/ /,$_,2;
$Global::rpl{$shorthand} = $long;
}
if(defined $opt::eof) { $Global::end_of_file_string = $opt::eof; }
if(defined $opt::max_args) { $Global::max_number_of_args = $opt::max_args; }
if(defined $opt::timeout) { $Global::timeoutq = TimeoutQueue->new($opt::timeout); }
if(defined $opt::tmpdir) { $ENV{'TMPDIR'} = $opt::tmpdir; }
if(defined $opt::help) { die_usage(); }
if(defined $opt::colsep) { $Global::trim = 'lr'; }
if(defined $opt::header) { $opt::colsep = defined $opt::colsep ? $opt::colsep : "\t"; }
if(defined $opt::trim) { $Global::trim = $opt::trim; }
if(defined $opt::arg_sep) { $Global::arg_sep = $opt::arg_sep; }
if(defined $opt::arg_file_sep) { $Global::arg_file_sep = $opt::arg_file_sep; }
if(defined $opt::number_of_cpus) { print SSHLogin::no_of_cpus(),"\n"; wait_and_exit(0); }
if(defined $opt::number_of_cores) {
print SSHLogin::no_of_cores(),"\n"; wait_and_exit(0);
}
if(defined $opt::max_line_length_allowed) {
print Limits::Command::real_max_length(),"\n"; wait_and_exit(0);
}
if(defined $opt::version) { version(); wait_and_exit(0); }
if(defined $opt::bibtex) { bibtex(); wait_and_exit(0); }
if(defined $opt::record_env) { record_env(); wait_and_exit(0); }
if(defined $opt::show_limits) { show_limits(); }
if(@opt::sshlogin) { @Global::sshlogin = @opt::sshlogin; }
if(@opt::sshloginfile) { read_sshloginfiles(@opt::sshloginfile); }
if(@opt::return) { push @Global::ret_files, @opt::return; }
if(not defined $opt::recstart and
not defined $opt::recend) { $opt::recend = "\n"; }
if(not defined $opt::blocksize) { $opt::blocksize = "1M"; }
$opt::blocksize = multiply_binary_prefix($opt::blocksize);
if(defined $opt::controlmaster) { $opt::noctrlc = 1; }
if(defined $opt::semaphore) { $Global::semaphore = 1; }
if(defined $opt::semaphoretimeout) { $Global::semaphore = 1; }
if(defined $opt::semaphorename) { $Global::semaphore = 1; }
if(defined $opt::fg) { $Global::semaphore = 1; }
if(defined $opt::bg) { $Global::semaphore = 1; }
if(defined $opt::wait) { $Global::semaphore = 1; }
if(defined $opt::halt_on_error and
$opt::halt_on_error=~/%/) { $opt::halt_on_error /= 100; }
if(defined $opt::timeout and $opt::timeout !~ /^\d+(\.\d+)?%?$/) {
::error("--timeout must be seconds or percentage\n");
wait_and_exit(255);
}
if(defined $opt::minversion) {
print $Global::version,"\n";
if($Global::version < $opt::minversion) {
wait_and_exit(255);
} else {
wait_and_exit(0);
}
}
if(not defined $opt::delay) {
$opt::delay = $opt::sshdelay;
}
if($opt::compress_program) {
$opt::compress = 1;
$opt::decompress_program ||= $opt::compress_program." -dc";
}
if($opt::compress) {
my ($compress, $decompress) = find_compression_program();
$opt::compress_program ||= $compress;
$opt::decompress_program ||= $decompress;
}
if(defined $opt::nonall) {
push @ARGV, $Global::arg_sep, "";
}
if(defined $opt::tty) {
if(not defined $opt::jobs) {
$opt::jobs = 1;
}
if(not defined $opt::group) {
$opt::ungroup = 0;
}
}
if(@opt::trc) {
push @Global::ret_files, @opt::trc;
$opt::transfer = 1;
$opt::cleanup = 1;
}
if(defined $opt::max_lines) {
if($opt::max_lines eq "-0") {
$opt::max_lines = 1;
$opt::0 = 1;
$/ = "\0";
} elsif ($opt::max_lines == 0) {
$opt::max_lines = 1;
}
$Global::max_lines = $opt::max_lines;
if(not $opt::pipe) {
$Global::max_number_of_args ||= $Global::max_lines;
}
}
if(defined $opt::L) {
$Global::max_lines = $opt::L;
if(not $opt::pipe) {
$Global::max_number_of_args ||= $Global::max_lines;
}
}
if(defined $opt::max_replace_args) {
$Global::max_number_of_args = $opt::max_replace_args;
$Global::ContextReplace = 1;
}
if((defined $opt::L or defined $opt::max_replace_args)
and
not ($opt::xargs or $opt::m)) {
$Global::ContextReplace = 1;
}
if(defined $opt::tag and not defined $opt::tagstring) {
$opt::tagstring = "\257<\257>"; }
if(defined $opt::pipepart and
(defined $opt::L or defined $opt::max_lines
or defined $opt::max_replace_args)) {
::error("--pipepart is incompatible with --max-replace-args, ",
"--max-lines, and -L.\n");
wait_and_exit(255);
}
if(grep /^$Global::arg_sep$|^$Global::arg_file_sep$/o, @ARGV) {
@ARGV=read_args_from_command_line();
}
$Global::semaphore ||= ($0 =~ m:(^|/)sem$:); if($Global::semaphore) {
@opt::a = ("/dev/null");
push(@Global::unget_argv, [Arg->new("")]);
$Semaphore::timeout = $opt::semaphoretimeout || 0;
if(defined $opt::semaphorename) {
$Semaphore::name = $opt::semaphorename;
} else {
$Semaphore::name = `tty`;
chomp $Semaphore::name;
}
$Semaphore::fg = $opt::fg;
$Semaphore::wait = $opt::wait;
$Global::default_simultaneous_sshlogins = 1;
if(not defined $opt::jobs) {
$opt::jobs = 1;
}
if($Global::interactive and $opt::bg) {
::error("Jobs running in the ".
"background cannot be interactive.\n");
::wait_and_exit(255);
}
}
if(defined $opt::eta) {
$opt::progress = $opt::eta;
}
if(defined $opt::bar) {
$opt::progress = $opt::bar;
}
if(defined $opt::retired) {
::error("-g has been retired. Use --group.\n");
::error("-B has been retired. Use --bf.\n");
::error("-T has been retired. Use --tty.\n");
::error("-U has been retired. Use --er.\n");
::error("-W has been retired. Use --wd.\n");
::error("-Y has been retired. Use --shebang.\n");
::error("-H has been retired. Use --halt.\n");
::error("--tollef has been retired. Use -u -q --arg-sep -- and --load for -l.\n");
::wait_and_exit(255);
}
citation_notice();
parse_sshlogin();
parse_env_var();
if(remote_hosts() and ($opt::X or $opt::m or $opt::xargs)) {
::warning("Using -X or -m with --sshlogin may fail.\n");
}
if(not defined $opt::jobs) {
$opt::jobs = "100%";
}
open_joblog();
}
sub env_quote {
my $v = $_[0];
$v =~ s/([\\])/\\$1/g;
$v =~ s/([\[\] \#\'\&\<\>\(\)\;\{\}\t\"\$\`\*\174\!\?\~])/\\$1/g;
$v =~ s/\n/"\n"/g;
return $v;
}
sub record_env {
my $ignore_filename = $ENV{'HOME'} . "/.parallel/ignored_vars";
if(open(my $vars_fh, ">", $ignore_filename)) {
print $vars_fh map { $_,"\n" } keys %ENV;
} else {
::error("Cannot write to $ignore_filename\n");
::wait_and_exit(255);
}
}
sub parse_env_var {
$Global::envvar = "";
$Global::envwarn = "";
my @vars = ('parallel_bash_environment');
for my $varstring (@opt::env) {
push @vars, split /,/, $varstring;
}
if(grep { /^_$/ } @vars) {
if(open(my $vars_fh, "<", $ENV{'HOME'} . "/.parallel/ignored_vars")) {
my @ignore = <$vars_fh>;
chomp @ignore;
my %ignore;
@ignore{@ignore} = @ignore;
close $vars_fh;
push @vars, grep { not defined $ignore{$_} } keys %ENV;
@vars = grep { not /^_$/ } @vars;
} else {
::error("Run '$Global::progname --record-env' in a clean environment first.\n");
::wait_and_exit(255);
}
}
@vars = map { $_, "BASH_FUNC_$_()" } @vars;
@vars = grep { defined($ENV{$_}) } @vars;
my @bash_functions = grep { substr($ENV{$_},0,4) eq "() {" } @vars;
my @non_functions = grep { substr($ENV{$_},0,4) ne "() {" } @vars;
if(@bash_functions) {
if($Global::shell !~ m:/(bash|rbash|zsh|rzsh|dash|ksh):) {
::warning("Shell functions may not be supported in $Global::shell\n");
}
}
my @bash_pre_shellshock = grep { not /\(\)/ } @bash_functions;
my @bash_post_shellshock = grep { /\(\)/ } @bash_functions;
my @qcsh = (map { my $a=$_; "setenv $a " . env_quote($ENV{$a}) }
grep { not /^parallel_bash_environment$/ } @non_functions);
my @qbash = (map { my $a=$_; "export $a=" . env_quote($ENV{$a}) }
@non_functions, @bash_pre_shellshock);
push @qbash, map { my $a=$_; "eval $a\"\$$a\"" } @bash_pre_shellshock;
push @qbash, map { /BASH_FUNC_(.*)\(\)/; "$1 $ENV{$_}" } @bash_post_shellshock;
if(my @v = map { s/BASH_FUNC_(.*)\(\)/$1/; $_ } grep { $ENV{$_}=~/\n/ } @vars) {
$Global::envwarn = ::shell_quote_scalar(q{echo $SHELL | grep -E "/t?csh" > /dev/null && echo CSH/TCSH DO NOT SUPPORT newlines IN VARIABLES/FUNCTIONS. Unset }."@v".q{ && exec false;}."\n\n") . $Global::envwarn;
}
if(not @qcsh) { push @qcsh, "true"; }
if(not @qbash) { push @qbash, "true"; }
if(@vars) {
$Global::envvar .=
join"",
(q{echo $SHELL | grep "/t\\{0,1\\}csh" > /dev/null && }
. join(" && ", @qcsh)
. q{ || }
. join(" && ", @qbash)
.q{;});
if($ENV{'parallel_bash_environment'}) {
$Global::envvar .= 'eval "$parallel_bash_environment";'."\n";
}
}
$Global::envvarlen = length $Global::envvar;
}
sub open_joblog {
my $append = 0;
if(($opt::resume or $opt::resume_failed)
and
not ($opt::joblog or $opt::results)) {
::error("--resume and --resume-failed require --joblog or --results.\n");
::wait_and_exit(255);
}
if($opt::joblog) {
if($opt::resume || $opt::resume_failed) {
if(open(my $joblog_fh, "<", $opt::joblog)) {
$append = <$joblog_fh>; my $joblog_regexp;
if($opt::resume_failed) {
$joblog_regexp='^(\d+)(?:\t[^\t]+){5}\t0\t0\t';
} else {
$joblog_regexp='^(\d+)';
}
while(<$joblog_fh>) {
if(/$joblog_regexp/o) {
vec($Global::job_already_run,($1||0),1) = 1;
} elsif(not /\d+\s+[^\s]+\s+([0-9.]+\s+){6}/) {
::error("Format of '$opt::joblog' is wrong: $_");
::wait_and_exit(255);
}
}
close $joblog_fh;
}
}
if($append) {
if(not open($Global::joblog, ">>", $opt::joblog)) {
::error("Cannot append to --joblog $opt::joblog.\n");
::wait_and_exit(255);
}
} else {
if($opt::joblog eq "-") {
$Global::joblog = $Global::fd{1};
} elsif(not open($Global::joblog, ">", $opt::joblog)) {
::error("Cannot write to --joblog $opt::joblog.\n");
::wait_and_exit(255);
}
print $Global::joblog
join("\t", "Seq", "Host", "Starttime", "JobRuntime",
"Send", "Receive", "Exitval", "Signal", "Command"
). "\n";
}
}
}
sub find_compression_program {
my @prg = qw(lzop pigz pxz gzip plzip pbzip2 lzma xz lzip bzip2);
for my $p (@prg) {
if(which($p)) {
return ("$p -c -1","$p -dc");
}
}
return ("cat","cat");
}
sub read_options {
if(defined $ARGV[0] and ($ARGV[0] =~ /^--shebang/ or
$ARGV[0] =~ /^--shebang-?wrap/ or
$ARGV[0] =~ /^--hashbang/)) {
$opt::shebang_wrap = ($ARGV[0] =~ s/^--shebang-?wrap *//);
$opt::shebang = ($ARGV[0] =~ s/^--shebang *//);
$opt::shebang .= ($ARGV[0] =~ s/^--hashbang *//);
if($opt::shebang) {
my $argfile = shell_quote_scalar(pop @ARGV);
exec "$0 --skip-first-line -a $argfile @ARGV";
}
if($opt::shebang_wrap) {
my @options;
my @parser;
if ($^O eq 'freebsd') {
my @nooptions = @ARGV;
get_options_from_array(\@nooptions);
while($#ARGV > $#nooptions) {
push @options, shift @ARGV;
}
while(@ARGV and $ARGV[0] ne ":::") {
push @parser, shift @ARGV;
}
if(@ARGV and $ARGV[0] eq ":::") {
shift @ARGV;
}
} else {
@options = shift @ARGV;
}
my $script = shell_quote_scalar(shift @ARGV);
exec "$0 --internal-pipe-means-argfiles @options @parser $script ::: @ARGV";
}
}
Getopt::Long::Configure("bundling","require_order");
my @ARGV_copy = @ARGV;
get_options_from_array(\@ARGV_copy,"profile|J=s","plain") || die_usage();
my @ARGV_profile = ();
my @ARGV_env = ();
if(not $opt::plain) {
my @config_profiles = (
"/etc/parallel/config",
$ENV{'HOME'}."/.parallel/config",
$ENV{'HOME'}."/.parallelrc");
my @profiles = @config_profiles;
if(@opt::profile) {
@profiles = ();
for my $profile (@opt::profile) {
if(-r $profile) {
push @profiles, $profile;
} else {
push @profiles, $ENV{'HOME'}."/.parallel/".$profile;
}
}
}
for my $profile (@profiles) {
if(-r $profile) {
open (my $in_fh, "<", $profile) || ::die_bug("read-profile: $profile");
while(<$in_fh>) {
/^\s*\#/ and next;
chomp;
push @ARGV_profile, shellwords($_);
}
close $in_fh;
} else {
if(grep /^$profile$/, @config_profiles) {
} else {
::error("$profile not readable.\n");
wait_and_exit(255);
}
}
}
if($ENV{'PARALLEL'}) {
@ARGV_env = shellwords($ENV{'PARALLEL'});
}
}
Getopt::Long::Configure("bundling","require_order");
get_options_from_array(\@ARGV_profile) || die_usage();
get_options_from_array(\@ARGV_env) || die_usage();
get_options_from_array(\@ARGV) || die_usage();
unshift @ARGV, @ARGV_profile, @ARGV_env;
return @ARGV;
}
sub read_args_from_command_line {
my @new_argv = ();
for(my $arg = shift @ARGV; @ARGV; $arg = shift @ARGV) {
if($arg eq $Global::arg_sep
or
$arg eq $Global::arg_file_sep) {
my $group = $arg; my @group;
while(defined ($arg = shift @ARGV)) {
if($arg eq $Global::arg_sep
or
$arg eq $Global::arg_file_sep) {
last;
} else {
push @group, $arg;
}
}
if($group eq $Global::arg_file_sep
or ($opt::internal_pipe_means_argfiles and $opt::pipe)
) {
push @opt::a, @group;
} elsif($group eq $Global::arg_sep) {
my ($outfh,$name) = ::tmpfile(SUFFIX => ".arg");
unlink($name);
print $outfh map { $_,$/ } @group;
seek $outfh, 0, 0;
push @opt::a, $outfh;
} else {
::die_bug("Unknown command line group: $group");
}
if(defined($arg)) {
redo;
} else {
last;
}
}
push @new_argv, $arg;
}
return @new_argv;
}
sub cleanup {
if(@opt::basefile) { cleanup_basefile(); }
}
sub __QUOTING_ARGUMENTS_FOR_SHELL__ {}
sub shell_quote {
my @strings = (@_);
for my $a (@strings) {
$a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g;
$a =~ s/[\n]/'\n'/g; }
return wantarray ? @strings : "@strings";
}
sub shell_quote_empty {
my @strings = shell_quote(@_);
for my $a (@strings) {
if($a eq "") {
$a = "''";
}
}
return wantarray ? @strings : "@strings";
}
sub shell_quote_scalar {
my $a = $_[0];
if(defined $a) {
$a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'\202-\377]/\\$&/go;
$a =~ s/[\n]/'\n'/go; }
return $a;
}
sub shell_quote_file {
my $a = shell_quote_scalar(shift);
if(defined $a) {
if($a =~ m:^/: or $a =~ m:^\./:) {
} else {
$a = "./".$a;
}
}
return $a;
}
sub shellwords {
$Global::use{"Text::ParseWords"} ||= eval "use Text::ParseWords; 1;";
return Text::ParseWords::shellwords(@_);
}
sub __FILEHANDLES__ {}
sub save_stdin_stdout_stderr {
for my $fdno (1..61) {
my $fh;
if(open($fh,">&=$fdno")) {
$Global::fd{$fdno}=$fh;
}
}
open $Global::original_stderr, ">&", "STDERR" or
::die_bug("Can't dup STDERR: $!");
open $Global::original_stdin, "<&", "STDIN" or
::die_bug("Can't dup STDIN: $!");
$Global::is_terminal = (-t $Global::original_stderr) && !$ENV{'CIRCLECI'}&& !$ENV{'GITHUB_ACTIONS'} && !$ENV{'TRAVIS'};
}
sub enough_file_handles {
if(not $opt::ungroup) {
my %fh;
my $enough_filehandles = 1;
for my $i (1..(7+2+keys %Global::fd)) {
$enough_filehandles &&= open($fh{$i}, "<", "/dev/null");
}
for (values %fh) { close $_; }
return $enough_filehandles;
} else {
return 1;
}
}
sub open_or_exit {
my $file = shift;
if($file eq "-") {
$Global::stdin_in_opt_a = 1;
return ($Global::original_stdin || *STDIN);
}
if(ref $file eq "GLOB") {
return $file;
}
my $fh = gensym;
if(not open($fh, "<", $file)) {
::error("Cannot open input file `$file': No such file or directory.\n");
wait_and_exit(255);
}
return $fh;
}
sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {}
sub init_run_jobs {
$Global::total_running = 0;
$Global::total_started = 0;
$Global::tty_taken = 0;
$SIG{USR1} = \&list_running_jobs;
$SIG{USR2} = \&toggle_progress;
if(@opt::basefile) { setup_basefile(); }
}
{
my $last_time;
my %last_mtime;
sub start_more_jobs {
my $jobs_started = 0;
my $jobs_started_this_round = 0;
if($Global::start_no_new_jobs) {
return $jobs_started;
}
if(time - ($last_time||0) > 1) {
$last_time = time;
if($Global::max_procs_file) {
my $mtime = (stat($Global::max_procs_file))[9];
if($mtime > $Global::max_procs_file_last_mod) {
$Global::max_procs_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_jobs_running(undef);
}
}
}
if(@opt::sshloginfile) {
for my $slf (@opt::sshloginfile) {
my $actual_file = expand_slf_shorthand($slf);
my $mtime = (stat($actual_file))[9];
$last_mtime{$actual_file} ||= $mtime;
if($mtime - $last_mtime{$actual_file} > 1) {
::debug("run","--sshloginfile $actual_file changed. reload\n");
$last_mtime{$actual_file} = $mtime;
@Global::sshlogin = ();
for (values %Global::host) {
$_->set_max_jobs_running(0);
}
read_sshloginfile($actual_file);
parse_sshlogin();
$opt::filter_hosts and filter_hosts();
setup_basefile();
}
}
}
}
do {
$jobs_started_this_round = 0;
for my $sshlogin (values %Global::host) {
if($Global::JobQueue->empty() and not $opt::pipe) {
last;
}
debug("run", "Running jobs before on ", $sshlogin->string(), ": ",
$sshlogin->jobs_running(), "\n");
if ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($opt::load and $sshlogin->loadavg_too_high()) {
next;
}
if($opt::noswap and $sshlogin->swapping()) {
next;
}
if($sshlogin->too_fast_remote_login()) {
next;
}
if($opt::delay and $opt::delay > ::now() - $Global::newest_starttime) {
next;
}
debug("run", $sshlogin->string(), " has ", $sshlogin->jobs_running(),
" out of ", $sshlogin->max_jobs_running(),
" jobs running. Start another.\n");
if(start_another_job($sshlogin) == 0) {
debug("run","No jobs started on ", $sshlogin->string(), "\n");
next;
}
$sshlogin->inc_jobs_running();
$sshlogin->set_last_login_at(::now());
$jobs_started++;
$jobs_started_this_round++;
}
debug("run","Running jobs after on ", $sshlogin->string(), ": ",
$sshlogin->jobs_running(), " of ",
$sshlogin->max_jobs_running(), "\n");
}
} while($jobs_started_this_round);
return $jobs_started;
}
}
{
my $no_more_file_handles_warned;
sub start_another_job {
my $sshlogin = shift;
if(enough_file_handles()) {
if($Global::JobQueue->empty() and not $opt::pipe) {
debug("start", "Not starting: JobQueue empty\n");
return 0;
} else {
my $job;
do {
$job = get_job_with_sshlogin($sshlogin);
if(not defined $job) {
debug("start", "Not starting: no jobs available for ",
$sshlogin->string(), "\n");
return 0;
}
} while ($job->is_already_in_joblog()
or
($opt::results and $opt::resume and $job->is_already_in_results()));
debug("start", "Command to run on '", $job->sshlogin()->string(), "': '",
$job->replaced(),"'\n");
if($job->start()) {
if($opt::pipe) {
push(@Global::virgin_jobs,$job);
}
debug("start", "Started as seq ", $job->seq(),
" pid:", $job->pid(), "\n");
return 1;
} else {
$Global::JobQueue->unget($job);
my $max = $sshlogin->max_jobs_running();
if($max > 1) { $max--; } else {
::error("No more processes: cannot run a single job. Something is wrong.\n");
::wait_and_exit(255);
}
$sshlogin->set_max_jobs_running($max);
::usleep(rand()*300);
::warning("No more processes: ",
"Decreasing number of running jobs to $max. ",
"Raising ulimit -u or /etc/security/limits.conf may help.\n");
return 0;
}
}
} else {
$no_more_file_handles_warned++ or
::warning("No more file handles. ",
"Raising ulimit -n or /etc/security/limits.conf may help.\n");
return 0;
}
}
}
$opt::min_progress_interval = 0;
sub init_progress {
$|=1;
if (not $Global::is_terminal) {
$opt::min_progress_interval = 30;
}
if($opt::bar) {
return("","");
}
my %progress = progress();
return ("\nComputers / CPU cores / Max jobs to run\n",
$progress{'workerlist'});
}
sub drain_job_queue {
if($opt::progress) {
print $Global::original_stderr init_progress();
}
my $last_header="";
my $sleep = 0.2;
my $last_left = 1000000000;
my $last_progress_time = 0;
my $ps_reported = 0;
do {
while($Global::total_running > 0) {
debug($Global::total_running, "==", scalar
keys %Global::running," slots: ", $Global::max_jobs_running);
if($opt::pipe) {
for my $job (values %Global::running) {
close $job->fh(0,"w");
}
}
if($opt::progress and
($Global::is_terminal or (time() - $last_progress_time) >= 30)) {
my %progress = progress();
if($last_header ne $progress{'header'}) {
print $Global::original_stderr "\n", $progress{'header'}, "\n";
$last_header = $progress{'header'};
}
if ($Global::is_terminal) {
print $Global::original_stderr "\r",$progress{'status'};
}
if ($last_left > $Global::left) {
if (not $Global::is_terminal) {
print $Global::original_stderr $progress{'status'},"\n";
}
$last_progress_time = time();
$ps_reported = 0;
} elsif (not $ps_reported and (time() - $last_progress_time) >= 60) {
print $Global::original_stderr "\n";
my $script_dir = ::dirname($0);
system("$script_dir/ps_with_stack || ps -wwf");
$ps_reported = 1;
}
$last_left = $Global::left;
flush $Global::original_stderr;
}
if($Global::total_running < $Global::max_jobs_running
and not $Global::JobQueue->empty()) {
if(start_more_jobs() > 0) {
$sleep = $sleep/2+0.001;
}
}
$sleep = ::reap_usleep($sleep);
}
if(not $Global::JobQueue->empty()) {
if(not %Global::host) {
::error("There are no hosts left to run on.\n");
::wait_and_exit(255);
}
start_more_jobs();
$sleep = ::reap_usleep($sleep);
if($Global::max_jobs_running == 0) {
::warning("There are no job slots available. Increase --jobs.\n");
}
}
} while ($Global::total_running > 0
or
not $Global::start_no_new_jobs and not $Global::JobQueue->empty());
if($opt::progress) {
my %progress = progress();
print $Global::original_stderr $opt::progress_sep, $progress{'status'}, "\n";
flush $Global::original_stderr;
}
}
sub toggle_progress {
$opt::progress = not $opt::progress;
if($opt::progress) {
print $Global::original_stderr init_progress();
}
}
sub progress {
if($opt::bar) {
return ("workerlist" => "", "header" => "", "status" => bar());
}
my $eta = "";
my ($status,$header)=("","");
if($opt::eta) {
my($total, $completed, $left, $pctcomplete, $avgtime, $this_eta) =
compute_eta();
$eta = sprintf("ETA: %ds Left: %d AVG: %.2fs ",
$this_eta, $left, $avgtime);
$Global::left = $left;
}
my $termcols = terminal_columns();
my @workers = sort keys %Global::host;
my %sshlogin = map { $_ eq ":" ? ($_=>"local") : ($_=>$_) } @workers;
my $workerno = 1;
my %workerno = map { ($_=>$workerno++) } @workers;
my $workerlist = "";
for my $w (@workers) {
$workerlist .=
$workerno{$w}.":".$sshlogin{$w} ." / ".
($Global::host{$w}->ncpus() || "-")." / ".
$Global::host{$w}->max_jobs_running()."\n";
}
$status = "x"x($termcols+1);
if(length $status > $termcols) {
$header = "Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete";
$status = $eta .
join(" ",map
{
if($Global::total_started) {
my $completed = ($Global::host{$_}->jobs_completed()||0);
my $running = $Global::host{$_}->jobs_running();
my $time = $completed ? (time-$^T)/($completed) : "0";
sprintf("%s:%d/%d/%d%%/%.1fs ",
$sshlogin{$_}, $running, $completed,
($running+$completed)*100
/ $Global::total_started, $time);
}
} @workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs running/jobs completed/%of started jobs";
$status = $eta .
join(" ",map
{
my $completed = ($Global::host{$_}->jobs_completed()||0);
my $running = $Global::host{$_}->jobs_running();
my $time = $completed ? (time-$^T)/($completed) : "0";
sprintf("%s:%d/%d/%d%%/%.1fs ",
$workerno{$_}, $running, $completed,
($running+$completed)*100
/ $Global::total_started, $time);
} @workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs running/jobs completed/%of started jobs";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d/%d%%",
$sshlogin{$_},
$Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0),
($Global::host{$_}->jobs_running()+
($Global::host{$_}->jobs_completed()||0))*100
/ $Global::total_started) }
@workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs running/jobs completed/%of started jobs";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d/%d%%",
$workerno{$_},
$Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0),
($Global::host{$_}->jobs_running()+
($Global::host{$_}->jobs_completed()||0))*100
/ $Global::total_started) }
@workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs running/jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d",
$sshlogin{$_}, $Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs running/jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d",
$sshlogin{$_}, $Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs running/jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d",
$workerno{$_}, $Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d",
$sshlogin{$_},
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
$header = "Computer:jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d",
$workerno{$_},
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
return ("workerlist" => $workerlist, "header" => $header, "status" => $status);
}
{
my ($total, $first_completed, $smoothed_avg_time);
sub compute_eta {
$total ||= $Global::JobQueue->total_jobs();
my $completed = 0;
for(values %Global::host) { $completed += $_->jobs_completed() }
my $left = $total - $completed;
if(not $completed) {
return($total, $completed, $left, 0, 0, 0);
}
my $pctcomplete = $completed / $total;
$first_completed ||= time;
my $timepassed = (time - $first_completed);
my $avgtime = $timepassed / $completed;
$smoothed_avg_time ||= $avgtime;
$smoothed_avg_time = (1 - $pctcomplete) * $smoothed_avg_time +
$pctcomplete * $avgtime;
my $eta = int($left * $smoothed_avg_time);
return($total, $completed, $left, $pctcomplete, $avgtime, $eta);
}
}
{
my ($rev,$reset);
sub bar {
$rev ||= "\033[7m";
$reset ||= "\033[0m";
my($total, $completed, $left, $pctcomplete, $avgtime, $eta) =
compute_eta();
my $arg = $Global::newest_job ?
$Global::newest_job->{'commandline'}->replace_placeholders(["\257<\257>"],0,0) : "";
$arg =~ tr/[\011-\016\033\302-\365]//d;
my $bar_text =
sprintf("%d%% %d:%d=%ds %s",
$pctcomplete*100, $completed, $left, $eta, $arg);
my $terminal_width = terminal_columns();
my $s = sprintf("%-${terminal_width}s",
substr($bar_text." "x$terminal_width,
0,$terminal_width));
my $width = int($terminal_width * $pctcomplete);
substr($s,$width,0) = $reset;
my $zenity = sprintf("%-${terminal_width}s",
substr("# $eta sec $arg",
0,$terminal_width));
$s = "\r" . $zenity . "\r" . $pctcomplete*100 . "\r" . $rev . $s . $reset;
return $s;
}
}
{
my ($columns,$last_column_time);
sub terminal_columns {
if(not $columns or $last_column_time < time) {
$last_column_time = time;
$columns = $ENV{'COLUMNS'};
if(not $columns) {
my $resize = qx{ resize 2>/dev/null };
$resize =~ /COLUMNS=(\d+);/ and do { $columns = $1; };
}
$columns ||= 80;
}
return $columns;
}
}
sub get_job_with_sshlogin {
my $sshlogin = shift;
my $job = undef;
if ($opt::hostgroups) {
my @other_hostgroup_jobs = ();
while($job = $Global::JobQueue->get()) {
if($sshlogin->in_hostgroups($job->hostgroups())) {
last;
} else {
push @other_hostgroup_jobs, $job;
}
}
$Global::JobQueue->unget(@other_hostgroup_jobs);
if(not defined $job) {
return undef;
}
} else {
$job = $Global::JobQueue->get();
if(not defined $job) {
::debug("start", "No more jobs: JobQueue empty\n");
return undef;
}
}
my $clean_command = $job->replaced();
if($clean_command =~ /^\s*$/) {
if(not $Global::JobQueue->empty()) {
return get_job_with_sshlogin($sshlogin);
} else {
return undef;
}
}
$job->set_sshlogin($sshlogin);
if($opt::retries and $clean_command and
$job->failed_here()) {
my ($no_of_failed_sshlogins,$min_failures) = $job->min_failed();
if($no_of_failed_sshlogins == grep { $_->max_jobs_running() > 0 } values %Global::host
and $job->failed_here() == $min_failures) {
} else {
my $nextjob;
if(not $Global::JobQueue->empty()) {
no warnings 'recursion';
$nextjob = get_job_with_sshlogin($sshlogin);
}
$Global::JobQueue->unget($job);
return $nextjob;
}
}
return $job;
}
sub __REMOTE_SSH__ {}
sub read_sshloginfiles {
for my $s (@_) {
read_sshloginfile(expand_slf_shorthand($s));
}
}
sub expand_slf_shorthand {
my $file = shift;
if($file eq "-") {
} elsif($file eq "..") {
$file = $ENV{'HOME'}."/.parallel/sshloginfile";
} elsif($file eq ".") {
$file = "/etc/parallel/sshloginfile";
} elsif(not -r $file) {
if(not -r $ENV{'HOME'}."/.parallel/".$file) {
::error("Cannot open $file.\n");
::wait_and_exit(255);
} else {
$file = $ENV{'HOME'}."/.parallel/".$file;
}
}
return $file;
}
sub read_sshloginfile {
my $file = shift;
my $close = 1;
my $in_fh;
::debug("init","--slf ",$file);
if($file eq "-") {
$in_fh = *STDIN;
$close = 0;
} else {
if(not open($in_fh, "<", $file)) {
::error("Cannot open $file.\n");
::wait_and_exit(255);
}
}
while(<$in_fh>) {
chomp;
/^\s*#/ and next;
/^\s*$/ and next;
push @Global::sshlogin, $_;
}
if($close) {
close $in_fh;
}
}
sub parse_sshlogin {
my @login;
if(not @Global::sshlogin) { @Global::sshlogin = (":"); }
for my $sshlogin (@Global::sshlogin) {
for my $s (split /,/, $sshlogin) {
if ($s eq ".." or $s eq "-") {
read_sshloginfile(expand_slf_shorthand($s));
} else {
push (@login, $s);
}
}
}
$Global::minimal_command_line_length = 8_000_000;
my @allowed_hostgroups;
for my $ncpu_sshlogin_string (::uniq(@login)) {
my $sshlogin = SSHLogin->new($ncpu_sshlogin_string);
my $sshlogin_string = $sshlogin->string();
if($sshlogin_string eq "") {
push @allowed_hostgroups, $sshlogin->hostgroups();
next;
}
if($Global::host{$sshlogin_string}) {
debug("run","Already seen $sshlogin_string\n");
if($sshlogin->{'ncpus'}) {
$Global::host{$sshlogin_string}->set_ncpus($sshlogin->ncpus());
}
$Global::host{$sshlogin_string}->set_max_jobs_running(undef);
next;
}
if($sshlogin_string eq ":") {
$sshlogin->set_maxlength(Limits::Command::max_length());
} else {
$sshlogin->set_maxlength(int(Limits::Command::max_length()/2));
}
$Global::minimal_command_line_length =
::min($Global::minimal_command_line_length, $sshlogin->maxlength());
$Global::host{$sshlogin_string} = $sshlogin;
}
if(@allowed_hostgroups) {
while (my ($string, $sshlogin) = each %Global::host) {
if(not $sshlogin->in_hostgroups(@allowed_hostgroups)) {
delete $Global::host{$string};
}
}
}
if($opt::transfer or @opt::return or $opt::cleanup or @opt::basefile) {
if(not remote_hosts()) {
if(@opt::trc) {
::warning("--trc ignored as there are no remote --sshlogin.\n");
} elsif (defined $opt::transfer) {
::warning("--transfer ignored as there are no remote --sshlogin.\n");
} elsif (@opt::return) {
::warning("--return ignored as there are no remote --sshlogin.\n");
} elsif (defined $opt::cleanup) {
::warning("--cleanup ignored as there are no remote --sshlogin.\n");
} elsif (@opt::basefile) {
::warning("--basefile ignored as there are no remote --sshlogin.\n");
}
}
}
}
sub remote_hosts {
return grep !/^:$/, keys %Global::host;
}
sub setup_basefile {
my $cmd = "";
my $rsync_destdir;
my $workdir;
for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next }
for my $file (@opt::basefile) {
if($file !~ m:^/: and $opt::workdir eq "...") {
::error("Work dir '...' will not work with relative basefiles\n");
::wait_and_exit(255);
}
$workdir ||= Job->new("")->workdir();
$cmd .= $sshlogin->rsync_transfer_cmd($file,$workdir) . "&";
}
}
$cmd .= "wait;";
debug("init", "basesetup: $cmd\n");
print `$cmd`;
}
sub cleanup_basefile {
my $cmd="";
my $workdir = Job->new("")->workdir();
for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next }
for my $file (@opt::basefile) {
$cmd .= $sshlogin->cleanup_cmd($file,$workdir)."&";
}
}
$cmd .= "wait;";
debug("init", "basecleanup: $cmd\n");
print `$cmd`;
}
sub filter_hosts {
my(@cores, @cpus, @maxline, @echo);
my $envvar = ::shell_quote_scalar($Global::envvar);
while (my ($host, $sshlogin) = each %Global::host) {
if($host eq ":") { next }
my $sshcmd = "true $host;" . $sshlogin->sshcommand()." ".$sshlogin->serverlogin();
push(@cores, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cores\n\0");
push(@cpus, $host."\t".$sshcmd." ".$envvar." parallel --number-of-cpus\n\0");
push(@maxline, $host."\t".$sshcmd." ".$envvar." parallel --max-line-length-allowed\n\0");
push(@echo, $host."\t".$sshcmd." echo\n\0");
}
my ($fh, $tmpfile) = ::tmpfile(SUFFIX => ".ssh");
print $fh @cores, @cpus, @maxline, @echo;
close $fh;
my $cmd = "cat $tmpfile | $0 -j0 --timeout 5 -s 16000 --joblog - --plain --delay 0.1 --retries 3 --tag --tagstring {1} -0 --colsep '\t' -k eval {2} 2>/dev/null";
::debug("init", $cmd, "\n");
open(my $host_fh, "-|", $cmd) || ::die_bug("parallel host check: $cmd");
my (%ncores, %ncpus, %time_to_login, %maxlen, %echo, @down_hosts);
my $prepend = "";
while(<$host_fh>) {
if(/\'$/) {
$prepend .= $_;
next;
}
$_ = $prepend . $_;
$prepend = "";
chomp;
my @col = split /\t/, $_;
if(defined $col[6]) {
if($col[0] eq "Seq" and $col[1] eq "Host" and
$col[2] eq "Starttime") {
next;
}
$col[8] =~ /eval true..([^;]+).;/ or ::die_bug("col8 does not contain host: $col[8]");
my $host = $1;
$host =~ tr/\\//d;
$Global::host{$host} or next;
if($col[6] eq "255" or $col[7] eq "15") {
::debug("init", "--filtered $host\n");
push(@down_hosts, $host);
@down_hosts = uniq(@down_hosts);
} elsif($col[6] eq "127") {
::warning("Could not figure out ",
"number of cpus on $host. Using 1.\n");
$ncores{$host} = 1;
$ncpus{$host} = 1;
$maxlen{$host} = Limits::Command::max_length();
} elsif($col[0] =~ /^\d+$/ and $Global::host{$host}) {
$time_to_login{$host} = ::min($time_to_login{$host},$col[3]);
} else {
::die_bug("host check unmatched long jobline: $_");
}
} elsif($Global::host{$col[0]}) {
if(not $ncores{$col[0]}) {
$ncores{$col[0]} = $col[1];
} elsif(not $ncpus{$col[0]}) {
$ncpus{$col[0]} = $col[1];
} elsif(not $maxlen{$col[0]}) {
$maxlen{$col[0]} = $col[1];
} elsif(not $echo{$col[0]}) {
$echo{$col[0]} = $col[1];
} elsif(m/perl: warning:|LANGUAGE =|LC_ALL =|LANG =|are supported and installed/) {
} else {
::die_bug("host check too many col0: $_");
}
} else {
::die_bug("host check unmatched short jobline ($col[0]): $_");
}
}
close $host_fh;
$Global::debug or unlink $tmpfile;
delete @Global::host{@down_hosts};
@down_hosts and ::warning("Removed @down_hosts\n");
$Global::minimal_command_line_length = 8_000_000;
while (my ($sshlogin, $obj) = each %Global::host) {
if($sshlogin eq ":") { next }
$ncpus{$sshlogin} or ::die_bug("ncpus missing: ".$obj->serverlogin());
$ncores{$sshlogin} or ::die_bug("ncores missing: ".$obj->serverlogin());
$time_to_login{$sshlogin} or ::die_bug("time_to_login missing: ".$obj->serverlogin());
$maxlen{$sshlogin} or ::die_bug("maxlen missing: ".$obj->serverlogin());
if($opt::use_cpus_instead_of_cores) {
$obj->set_ncpus($ncpus{$sshlogin});
} else {
$obj->set_ncpus($ncores{$sshlogin});
}
$obj->set_time_to_login($time_to_login{$sshlogin});
$obj->set_maxlength($maxlen{$sshlogin});
$Global::minimal_command_line_length =
::min($Global::minimal_command_line_length,
int($maxlen{$sshlogin}/2));
::debug("init", "Timing from -S:$sshlogin ncpus:",$ncpus{$sshlogin},
" ncores:", $ncores{$sshlogin},
" time_to_login:", $time_to_login{$sshlogin},
" maxlen:", $maxlen{$sshlogin},
" min_max_len:", $Global::minimal_command_line_length,"\n");
}
}
sub onall {
sub tmp_joblog {
my $joblog = shift;
if(not defined $joblog) {
return undef;
}
my ($fh, $tmpfile) = ::tmpfile(SUFFIX => ".log");
close $fh;
return $tmpfile;
}
my @command = @_;
if($Global::quoting) {
@command = shell_quote_empty(@command);
}
my @argfiles = ();
for my $fh (@fhlist) {
my ($outfh, $name) = ::tmpfile(SUFFIX => ".all", UNLINK => 1);
print $outfh (<$fh>);
close $outfh;
push @argfiles, $name;
}
if(@opt::basefile) { setup_basefile(); }
my $options =
join(" ",
((defined $opt::jobs) ? "-P $opt::jobs" : ""),
((defined $opt::linebuffer) ? "--linebuffer" : ""),
((defined $opt::ungroup) ? "-u" : ""),
((defined $opt::group) ? "-g" : ""),
((defined $opt::keeporder) ? "--keeporder" : ""),
((defined $opt::D) ? "-D $opt::D" : ""),
((defined $opt::plain) ? "--plain" : ""),
((defined $opt::max_chars) ? "--max-chars ".$opt::max_chars : ""),
);
my $suboptions =
join(" ",
((defined $opt::ungroup) ? "-u" : ""),
((defined $opt::linebuffer) ? "--linebuffer" : ""),
((defined $opt::group) ? "-g" : ""),
((defined $opt::files) ? "--files" : ""),
((defined $opt::keeporder) ? "--keeporder" : ""),
((defined $opt::colsep) ? "--colsep ".shell_quote($opt::colsep) : ""),
((@opt::v) ? "-vv" : ""),
((defined $opt::D) ? "-D $opt::D" : ""),
((defined $opt::timeout) ? "--timeout ".$opt::timeout : ""),
((defined $opt::plain) ? "--plain" : ""),
((defined $opt::retries) ? "--retries ".$opt::retries : ""),
((defined $opt::max_chars) ? "--max-chars ".$opt::max_chars : ""),
((defined $opt::arg_sep) ? "--arg-sep ".$opt::arg_sep : ""),
((defined $opt::arg_file_sep) ? "--arg-file-sep ".$opt::arg_file_sep : ""),
(@opt::env ? map { "--env ".::shell_quote_scalar($_) } @opt::env : ""),
);
::debug("init", "| $0 $options\n");
open(my $parallel_fh, "|-", "$0 --no-notice -j0 $options") ||
::die_bug("This does not run GNU Parallel: $0 $options");
my @joblogs;
for my $host (sort keys %Global::host) {
my $sshlogin = $Global::host{$host};
my $joblog = tmp_joblog($opt::joblog);
if($joblog) {
push @joblogs, $joblog;
$joblog = "--joblog $joblog";
}
my $quad = $opt::arg_file_sep || "::::";
::debug("init", "$0 $suboptions -j1 $joblog ",
((defined $opt::tag) ?
"--tagstring ".shell_quote_scalar($sshlogin->string()) : ""),
" -S ", shell_quote_scalar($sshlogin->string())," ",
join(" ",shell_quote(@command))," $quad @argfiles\n");
print $parallel_fh "$0 $suboptions -j1 $joblog ",
((defined $opt::tag) ?
"--tagstring ".shell_quote_scalar($sshlogin->string()) : ""),
" -S ", shell_quote_scalar($sshlogin->string())," ",
join(" ",shell_quote(@command))," $quad @argfiles\n";
}
close $parallel_fh;
$Global::exitstatus = $? >> 8;
debug("init", "--onall exitvalue ", $?);
if(@opt::basefile) { cleanup_basefile(); }
$Global::debug or unlink(@argfiles);
my %seen;
for my $joblog (@joblogs) {
open(my $fh, "<", $joblog) || ::die_bug("Cannot open tmp joblog $joblog");
<$fh>;
print $Global::joblog (<$fh>);
close $fh;
unlink($joblog);
}
}
sub __SIGNAL_HANDLING__ {}
sub save_original_signal_handler {
$SIG{TERM} ||= sub { exit 0; }; $SIG{INT} = sub { if($opt::tmux) { qx { tmux kill-session -t p$$ }; }
unlink keys %Global::unlink; exit -1 };
$SIG{TERM} = sub { if($opt::tmux) { qx { tmux kill-session -t p$$ }; }
unlink keys %Global::unlink; exit -1 };
%Global::original_sig = %SIG;
$SIG{TERM} = sub {}; }
sub list_running_jobs {
for my $v (values %Global::running) {
print $Global::original_stderr "$Global::progname: ",$v->replaced(),"\n";
}
}
sub start_no_new_jobs {
$SIG{TERM} = $Global::original_sig{TERM};
print $Global::original_stderr
("$Global::progname: SIGTERM received. No new jobs will be started.\n",
"$Global::progname: Waiting for these ", scalar(keys %Global::running),
" jobs to finish. Send SIGTERM again to stop now.\n");
list_running_jobs();
$Global::start_no_new_jobs ||= 1;
}
sub reaper {
my $stiff;
my $children_reaped = 0;
debug("run", "Reaper ");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
$children_reaped++;
if($Global::sshmaster{$stiff}) {
next;
}
my $job = $Global::running{$stiff};
$job or next;
$job->set_exitstatus($? >> 8);
$job->set_exitsignal($? & 127);
debug("run", "died (", $job->exitstatus(), "): ", $job->seq());
$job->set_endtime(::now());
if($stiff == $Global::tty_taken) {
$Global::tty_taken = 0;
}
if(not $job->should_be_retried()) {
push @Global::slots, $job->slot();
if($opt::timeout) {
$Global::timeoutq->update_delta_time($job->runtime());
}
my $print_now = ($opt::halt_on_error and $opt::halt_on_error == 2
and $job->exitstatus());
if($opt::keeporder and not $print_now) {
print_earlier_jobs($job);
} else {
$job->print();
}
if($job->exitstatus()) {
process_failed_job($job);
}
}
my $sshlogin = $job->sshlogin();
$sshlogin->dec_jobs_running();
$sshlogin->inc_jobs_completed();
$Global::total_running--;
delete $Global::running{$stiff};
start_more_jobs();
}
debug("run", "done ");
return $children_reaped;
}
sub process_failed_job {
my $job = shift;
$Global::exitstatus++;
$Global::total_failed++;
if($opt::halt_on_error) {
if($opt::halt_on_error == 1
or
($opt::halt_on_error < 1 and $Global::total_failed > 3
and
$Global::total_failed / $Global::total_started > $opt::halt_on_error)) {
print $Global::original_stderr
("$Global::progname: Starting no more jobs. ",
"Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n",
$job->replaced(),"\n");
$Global::start_no_new_jobs ||= 1;
$Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($opt::halt_on_error == 2) {
print $Global::original_stderr
("$Global::progname: This job failed:\n",
$job->replaced(),"\n");
exit ($job->exitstatus());
}
}
}
{
my (%print_later,$job_end_sequence);
sub print_earlier_jobs {
my $job = shift;
$print_later{$job->seq()} = $job;
$job_end_sequence ||= 1;
debug("run", "Looking for: $job_end_sequence ",
"Current: ", $job->seq(), "\n");
for(my $j = $print_later{$job_end_sequence};
$j or vec($Global::job_already_run,$job_end_sequence,1);
$job_end_sequence++,
$j = $print_later{$job_end_sequence}) {
debug("run", "Found job end $job_end_sequence");
if($j) {
$j->print();
delete $print_later{$job_end_sequence};
}
}
}
}
sub __USAGE__ {}
sub wait_and_exit {
my $error = shift;
if($error) {
for my $job (values %Global::running) {
$job->kill("TERM");
$job->kill("TERM");
}
}
for (keys %Global::unkilled_children) {
kill 9, $_;
waitpid($_,0);
delete $Global::unkilled_children{$_};
}
wait();
exit($error);
}
sub die_usage {
usage();
wait_and_exit(255);
}
sub usage {
print join
("\n",
"Usage:",
"",
"$Global::progname [options] [command [arguments]] < list_of_arguments",
"$Global::progname [options] [command [arguments]] (::: arguments|:::: argfile(s))...",
"cat ... | $Global::progname --pipe [options] [command [arguments]]",
"",
"-j n Run n jobs in parallel",
"-k Keep same order",
"-X Multiple arguments with context replace",
"--colsep regexp Split input on regexp for positional replacements",
"{} {.} {/} {/.} {#} {%} {= perl code =} Replacement strings",
"{3} {3.} {3/} {3/.} {=3 perl code =} Positional replacement strings",
"With --plus: {} = {+/}/{/} = {.}.{+.} = {+/}/{/.}.{+.} = {..}.{+..} =",
" {+/}/{/..}.{+..} = {...}.{+...} = {+/}/{/...}.{+...}",
"",
"-S sshlogin Example: foo\@server.example.com",
"--slf .. Use ~/.parallel/sshloginfile as the list of sshlogins",
"--trc {}.bar Shorthand for --transfer --return {}.bar --cleanup",
"--onall Run the given command with argument on all sshlogins",
"--nonall Run the given command with no arguments on all sshlogins",
"",
"--pipe Split stdin (standard input) to multiple jobs.",
"--recend str Record end separator for --pipe.",
"--recstart str Record start separator for --pipe.",
"",
"See 'man $Global::progname' for details",
"",
"When using programs that use GNU Parallel to process data for publication please cite:",
"",
"O. Tange (2011): GNU Parallel - The Command-Line Power Tool,",
";login: The USENIX Magazine, February 2011:42-47.",
"",
"Or you can get GNU Parallel without this requirement by paying 10000 EUR.",
"");
}
sub citation_notice {
if($opt::no_notice
or
$opt::plain
or
not -t $Global::original_stderr
or
-e $ENV{'HOME'}."/.parallel/will-cite") {
} else {
print $Global::original_stderr
("When using programs that use GNU Parallel to process data for publication please cite:\n",
"\n",
" O. Tange (2011): GNU Parallel - The Command-Line Power Tool,\n",
" ;login: The USENIX Magazine, February 2011:42-47.\n",
"\n",
"This helps funding further development; and it won't cost you a cent.\n",
"Or you can get GNU Parallel without this requirement by paying 10000 EUR.\n",
"\n",
"To silence this citation notice run 'parallel --bibtex' once or use '--no-notice'.\n\n",
);
flush $Global::original_stderr;
}
}
sub warning {
my @w = @_;
my $fh = $Global::original_stderr || *STDERR;
my $prog = $Global::progname || "parallel";
print $fh $prog, ": Warning: ", @w;
}
sub error {
my @w = @_;
my $fh = $Global::original_stderr || *STDERR;
my $prog = $Global::progname || "parallel";
print $fh $prog, ": Error: ", @w;
}
sub die_bug {
my $bugid = shift;
print STDERR
("$Global::progname: This should not happen. You have found a bug.\n",
"Please contact <parallel\@gnu.org> and include:\n",
"* The version number: $Global::version\n",
"* The bugid: $bugid\n",
"* The command line being run\n",
"* The files being read (put the files on a webserver if they are big)\n",
"\n",
"If you get the error on smaller/fewer files, please include those instead.\n");
::wait_and_exit(255);
}
sub version {
if($opt::tollef and not $opt::gnu) {
print "WARNING: YOU ARE USING --tollef. IF THINGS ARE ACTING WEIRD USE --gnu.\n";
}
print join("\n",
"GNU $Global::progname $Global::version",
"Copyright (C) 2007,2008,2009,2010,2011,2012,2013,2014 Ole Tange and Free Software Foundation, Inc.",
"License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>",
"This is free software: you are free to change and redistribute it.",
"GNU $Global::progname comes with no warranty.",
"",
"Web site: http://www.gnu.org/software/${Global::progname}\n",
"When using programs that use GNU Parallel to process data for publication please cite:\n",
"O. Tange (2011): GNU Parallel - The Command-Line Power Tool, ",
";login: The USENIX Magazine, February 2011:42-47.\n",
"Or you can get GNU Parallel without this requirement by paying 10000 EUR.\n",
);
}
sub bibtex {
if($opt::tollef and not $opt::gnu) {
print "WARNING: YOU ARE USING --tollef. IF THINGS ARE ACTING WEIRD USE --gnu.\n";
}
print join("\n",
"When using programs that use GNU Parallel to process data for publication please cite:",
"",
"\@article{Tange2011a,",
" title = {GNU Parallel - The Command-Line Power Tool},",
" author = {O. Tange},",
" address = {Frederiksberg, Denmark},",
" journal = {;login: The USENIX Magazine},",
" month = {Feb},",
" number = {1},",
" volume = {36},",
" url = {http://www.gnu.org/s/parallel},",
" year = {2011},",
" pages = {42-47}",
"}",
"",
"(Feel free to use \\nocite{Tange2011a})",
"",
"This helps funding further development.",
"",
"Or you can get GNU Parallel without this requirement by paying 10000 EUR.",
""
);
while(not -e $ENV{'HOME'}."/.parallel/will-cite") {
print "\nType: 'will cite' and press enter.\n> ";
my $input = <STDIN>;
if($input =~ /will cite/i) {
mkdir $ENV{'HOME'}."/.parallel";
open (my $fh, ">", $ENV{'HOME'}."/.parallel/will-cite")
|| ::die_bug("Cannot write: ".$ENV{'HOME'}."/.parallel/will-cite");
close $fh;
print "\nThank you for your support. It is much appreciated. The citation\n",
"notice is now silenced.\n";
}
}
}
sub show_limits {
print("Maximal size of command: ",Limits::Command::real_max_length(),"\n",
"Maximal used size of command: ",Limits::Command::max_length(),"\n",
"\n",
"Execution of will continue now, and it will try to read its input\n",
"and run commands; if this is not what you wanted to happen, please\n",
"press CTRL-D or CTRL-C\n");
}
sub __GENERIC_COMMON_FUNCTION__ {}
sub uniq {
return keys %{{ map { $_ => 1 } @_ }};
}
sub min {
my $min;
for (@_) {
defined $_ or next;
defined $min or do { $min = $_; next; }; $min = ($min < $_) ? $min : $_;
}
return $min;
}
sub max {
my $max;
for (@_) {
defined $_ or next;
defined $max or do { $max = $_; next; }; $max = ($max > $_) ? $max : $_;
}
return $max;
}
sub sum {
my @args = @_;
my $sum = 0;
for (@args) {
$_ and do { $sum += $_; }
}
return $sum;
}
sub undef_as_zero {
my $a = shift;
return $a ? $a : 0;
}
sub undef_as_empty {
my $a = shift;
return $a ? $a : "";
}
{
my $hostname;
sub hostname {
if(not $hostname) {
$hostname = `hostname`;
chomp($hostname);
$hostname ||= "nohostname";
}
return $hostname;
}
}
sub which {
my @which;
for my $prg (@_) {
push @which, map { $_."/".$prg } grep { -x $_."/".$prg } split(":",$ENV{'PATH'});
}
return @which;
}
{
my ($regexp,%fakename);
sub parent_shell {
my $pid = shift;
if(not $regexp) {
my @shells = qw(ash bash csh dash fdsh fish fizsh ksh
ksh93 mksh pdksh posh rbash rush rzsh
sash sh static-sh tcsh yash zsh -sh -csh);
my $shell = "(?:".join("|",@shells).")";
$regexp = '^((\[)('. $shell. ')(\])|(|\S+/|busybox )('. $shell. '))($| )';
%fakename = (
"-sh" => ["csh", "tcsh"],
"-csh" => ["tcsh", "csh"],
);
}
my ($children_of_ref, $parent_of_ref, $name_of_ref) = pid_table();
my $shellpath;
my $testpid = $pid;
while($testpid) {
::debug("init", "shell? ". $name_of_ref->{$testpid}."\n");
if($name_of_ref->{$testpid} =~ /$regexp/o) {
::debug("init", "which ".($3||$6)." => ");
$shellpath = (which($3 || $6,@{$fakename{$3 || $6}}))[0];
::debug("init", "shell path $shellpath\n");
$shellpath and last;
}
$testpid = $parent_of_ref->{$testpid};
}
return $shellpath;
}
}
{
my %pid_parentpid_cmd;
sub pid_table {
if(not %pid_parentpid_cmd) {
my $sysv = q( ps -ef | perl -ane '1..1 and /^(.*)CO?MM?A?N?D/ and $s=length $1;).
q(s/^.{$s}//; print "@F[1,2] $_"' );
my $bsd = q(ps -o pid,ppid,command -ax);
%pid_parentpid_cmd =
(
'aix' => $sysv,
'cygwin' => $sysv,
'msys' => $sysv,
'dec_osf' => $sysv,
'darwin' => $bsd,
'dragonfly' => $bsd,
'freebsd' => $bsd,
'gnu' => $sysv,
'hpux' => $sysv,
'linux' => $sysv,
'mirbsd' => $bsd,
'netbsd' => $bsd,
'nto' => $sysv,
'openbsd' => $bsd,
'solaris' => $sysv,
'svr5' => $sysv,
);
}
$pid_parentpid_cmd{$^O} or ::die_bug("pid_parentpid_cmd for $^O missing");
my (@pidtable,%parent_of,%children_of,%name_of);
@pidtable = `$pid_parentpid_cmd{$^O}`;
my $p=$$;
for (@pidtable) {
/(\S+)\s+(\S+)\s+(\S+.*)/ or ::die_bug("pidtable format: $_");
$parent_of{$1} = $2;
push @{$children_of{$2}}, $1;
$name_of{$1} = $3;
}
return(\%children_of, \%parent_of, \%name_of);
}
}
sub reap_usleep {
my $ms = shift;
if(reaper()) {
return $ms/2+0.001;
} else {
if($opt::timeout) {
$Global::timeoutq->process_timeouts();
}
usleep($ms);
Job::exit_if_disk_full();
if($opt::linebuffer) {
for my $job (values %Global::running) {
$job->print();
}
}
return (($ms < 1000) ? ($ms * 1.1) : ($ms));
}
}
sub usleep {
my $ms = shift;
::debug(int($ms),"ms ");
select(undef, undef, undef, $ms/1000);
}
sub now {
if(not $Global::use{"Time::HiRes"}) {
if(eval "use Time::HiRes qw ( time );") {
eval "sub TimeHiRestime { return Time::HiRes::time };";
} else {
eval "sub TimeHiRestime { return time() };";
}
$Global::use{"Time::HiRes"} = 1;
}
return (int(TimeHiRestime()*1000))/1000;
}
sub multiply_binary_prefix {
my $s = shift;
$s =~ s/ki/*1024/gi;
$s =~ s/mi/*1024*1024/gi;
$s =~ s/gi/*1024*1024*1024/gi;
$s =~ s/ti/*1024*1024*1024*1024/gi;
$s =~ s/pi/*1024*1024*1024*1024*1024/gi;
$s =~ s/ei/*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/zi/*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/yi/*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/xi/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/K/*1024/g;
$s =~ s/M/*1024*1024/g;
$s =~ s/G/*1024*1024*1024/g;
$s =~ s/T/*1024*1024*1024*1024/g;
$s =~ s/P/*1024*1024*1024*1024*1024/g;
$s =~ s/E/*1024*1024*1024*1024*1024*1024/g;
$s =~ s/Z/*1024*1024*1024*1024*1024*1024*1024/g;
$s =~ s/Y/*1024*1024*1024*1024*1024*1024*1024*1024/g;
$s =~ s/X/*1024*1024*1024*1024*1024*1024*1024*1024*1024/g;
$s =~ s/k/*1000/g;
$s =~ s/m/*1000*1000/g;
$s =~ s/g/*1000*1000*1000/g;
$s =~ s/t/*1000*1000*1000*1000/g;
$s =~ s/p/*1000*1000*1000*1000*1000/g;
$s =~ s/e/*1000*1000*1000*1000*1000*1000/g;
$s =~ s/z/*1000*1000*1000*1000*1000*1000*1000/g;
$s =~ s/y/*1000*1000*1000*1000*1000*1000*1000*1000/g;
$s =~ s/x/*1000*1000*1000*1000*1000*1000*1000*1000*1000/g;
$s = eval $s;
::debug($s);
return $s;
}
sub tmpfile {
return ::tempfile(DIR=>$ENV{'TMPDIR'}, TEMPLATE => 'parXXXXX', @_);
}
sub __DEBUGGING__ {}
sub debug {
$Global::debug or return;
@_ = grep { defined $_ ? $_ : "" } @_;
if($Global::debug eq "all" or $Global::debug eq $_[0]) {
if($Global::fd{1}) {
my $stdout = $Global::fd{1};
print $stdout @_[1..$#_];
} else {
print @_[1..$#_];
}
}
}
sub my_memory_usage {
use strict;
use FileHandle;
my $pid = $$;
if(-e "/proc/$pid/stat") {
my $fh = FileHandle->new("</proc/$pid/stat");
my $data = <$fh>;
chomp $data;
$fh->close;
my @procinfo = split(/\s+/,$data);
return undef_as_zero($procinfo[22]);
} else {
return 0;
}
}
sub my_size {
my @size_this = (@_);
eval "use Devel::Size qw(size total_size)";
if ($@) {
return -1;
} else {
return total_size(@_);
}
}
sub my_dump {
my @dump_this = (@_);
eval "use Data::Dump qw(dump);";
if ($@) {
eval "use Data::Dumper;";
if ($@) {
my $err = "Neither Data::Dump nor Data::Dumper is installed\n".
"Not dumping output\n";
print $Global::original_stderr $err;
return $err;
} else {
return Dumper(@dump_this);
}
} else {
eval "sub Data::Dump:dump {}";
eval "use Data::Dump qw(dump);";
return (Data::Dump::dump(@dump_this));
}
}
sub my_croak {
eval "use Carp; 1";
$Carp::Verbose = 1;
croak(@_);
}
sub my_carp {
eval "use Carp; 1";
$Carp::Verbose = 1;
carp(@_);
}
sub __OBJECT_ORIENTED_PARTS__ {}
package SSHLogin;
sub new {
my $class = shift;
my $sshlogin_string = shift;
my $ncpus;
my %hostgroups;
if($sshlogin_string =~ s:^\@([^/]+)/?::) {
%hostgroups = map { $_ => 1 } split(/\+/, $1);
}
if ($sshlogin_string =~ s:^(\d+)/::) {
$ncpus = $1;
}
my $string = $sshlogin_string;
$hostgroups{$string} = 1;
@Global::hostgroups{keys %hostgroups} = values %hostgroups;
my @unget = ();
my $no_slash_string = $string;
$no_slash_string =~ s/[^-a-z0-9:]/_/gi;
return bless {
'string' => $string,
'jobs_running' => 0,
'jobs_completed' => 0,
'maxlength' => undef,
'max_jobs_running' => undef,
'orig_max_jobs_running' => undef,
'ncpus' => $ncpus,
'hostgroups' => \%hostgroups,
'sshcommand' => undef,
'serverlogin' => undef,
'control_path_dir' => undef,
'control_path' => undef,
'time_to_login' => undef,
'last_login_at' => undef,
'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/loadavg-" .
$no_slash_string,
'loadavg' => undef,
'last_loadavg_update' => 0,
'swap_activity_file' => $ENV{'HOME'} . "/.parallel/tmp/swap_activity-" .
$no_slash_string,
'swap_activity' => undef,
}, ref($class) || $class;
}
sub DESTROY {
my $self = shift;
unlink $self->{'loadavg_file'};
unlink $self->{'swap_activity_file'};
}
sub string {
my $self = shift;
return $self->{'string'};
}
sub jobs_running {
my $self = shift;
return ($self->{'jobs_running'} || "0");
}
sub inc_jobs_running {
my $self = shift;
$self->{'jobs_running'}++;
}
sub dec_jobs_running {
my $self = shift;
$self->{'jobs_running'}--;
}
sub set_maxlength {
my $self = shift;
$self->{'maxlength'} = shift;
}
sub maxlength {
my $self = shift;
return $self->{'maxlength'};
}
sub jobs_completed {
my $self = shift;
return $self->{'jobs_completed'};
}
sub in_hostgroups {
my $self = shift;
return grep { defined $self->{'hostgroups'}{$_} } @_;
}
sub hostgroups {
my $self = shift;
return keys %{$self->{'hostgroups'}};
}
sub inc_jobs_completed {
my $self = shift;
$self->{'jobs_completed'}++;
}
sub set_max_jobs_running {
my $self = shift;
if(defined $self->{'max_jobs_running'}) {
$Global::max_jobs_running -= $self->{'max_jobs_running'};
}
$self->{'max_jobs_running'} = shift;
if(defined $self->{'max_jobs_running'}) {
$Global::max_jobs_running += $self->{'max_jobs_running'};
}
$self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'};
}
sub swapping {
my $self = shift;
my $swapping = $self->swap_activity();
return (not defined $swapping or $swapping)
}
sub swap_activity {
my $self = shift;
my $update_swap_activity_file = 0;
if(-r $self->{'swap_activity_file'}) {
open(my $swap_fh, "<", $self->{'swap_activity_file'}) || ::die_bug("swap_activity_file-r");
my $swap_out = <$swap_fh>;
close $swap_fh;
if($swap_out =~ /^(\d+)$/) {
$self->{'swap_activity'} = $1;
::debug("swap", "New swap_activity: ", $self->{'swap_activity'});
}
::debug("swap", "Last update: ", $self->{'last_swap_activity_update'});
if(time - $self->{'last_swap_activity_update'} > 10) {
::debug("swap", "Older than 10 sec: ", $self->{'swap_activity_file'});
$update_swap_activity_file = 1;
}
} else {
::debug("swap", "No swap_activity file: ", $self->{'swap_activity_file'});
$self->{'swap_activity'} = undef;
$update_swap_activity_file = 1;
}
if($update_swap_activity_file) {
::debug("swap", "Updating swap_activity file ", $self->{'swap_activity_file'});
$self->{'last_swap_activity_update'} = time;
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
my $swap_activity;
$swap_activity = swapactivityscript();
if($self->{'string'} ne ":") {
$swap_activity = $self->sshcommand() . " " . $self->serverlogin() . " " .
::shell_quote_scalar($swap_activity);
}
my $file = $self->{'swap_activity_file'};
my ($dummy_fh, $tmpfile) = ::tmpfile(SUFFIX => ".swp");
::debug("swap", "\n", $swap_activity, "\n");
qx{ ($swap_activity > $tmpfile && mv $tmpfile $file || rm $tmpfile) & };
}
return $self->{'swap_activity'};
}
{
my $script;
sub swapactivityscript {
if(not $script) {
my %vmstat = (
'linux' => ['vmstat 1 2 | tail -n1', '$7*$8'],
'solaris' => ['vmstat -S 1 2 | tail -1', '$6*$7'],
'darwin' => ['vm_stat -c 2 1 | tail -n1', '$21*$22'],
'ultrix' => ['vmstat -S 1 2 | tail -1', '$12*$13'],
'aix' => ['vmstat 1 2 | tail -n1', '$6*$7'],
'freebsd' => ['vmstat -H 1 2 | tail -n1', '$8*$9'],
'mirbsd' => ['vmstat 1 2 | tail -n1', '$8*$9'],
'netbsd' => ['vmstat 1 2 | tail -n1', '$7*$8'],
'openbsd' => ['vmstat 1 2 | tail -n1', '$8*$9'],
'hpux' => ['vmstat 1 2 | tail -n1', '$8*$9'],
'dec_osf' => ['vmstat 1 2 | tail -n1', '$11*$12'],
'gnu' => ['vmstat -k 1 2 | tail -n1', '$7*$8'],
);
my $perlscript = "";
for my $os (keys %vmstat) {
$vmstat{$os}[1] =~ s/\$/\\\\\\\$/g; $perlscript .= 'if($^O eq "'.$os.'") { print `'.$vmstat{$os}[0].' | awk "{print ' .
$vmstat{$os}[1] . '}"` }';
}
$perlscript = "perl -e " . ::shell_quote_scalar($perlscript);
$script = $Global::envvar. " " .$perlscript;
}
return $script;
}
}
sub too_fast_remote_login {
my $self = shift;
if($self->{'last_login_at'} and $self->{'time_to_login'}) {
my $too_fast = (::now() <= $self->{'last_login_at'}
+ $self->{'time_to_login'}/5);
::debug("run", "Too fast? $too_fast ");
return $too_fast;
} else {
return 0;
}
}
sub last_login_at {
my $self = shift;
return $self->{'last_login_at'};
}
sub set_last_login_at {
my $self = shift;
$self->{'last_login_at'} = shift;
}
sub loadavg_too_high {
my $self = shift;
my $loadavg = $self->loadavg();
return (not defined $loadavg or
$loadavg > $self->max_loadavg());
}
sub loadavg {
my $self = shift;
my $update_loadavg_file = 0;
if(open(my $load_fh, "<", $self->{'loadavg_file'})) {
local $/ = undef;
my $load_out = <$load_fh>;
close $load_fh;
my $load =()= ($load_out=~/(^[DR]....[^\[])/gm);
if($load > 0) {
$self->{'loadavg'} = $load - 1;
::debug("load", "New loadavg: ", $self->{'loadavg'});
} else {
::die_bug("loadavg_invalid_content: $load_out");
}
::debug("load", "Last update: ", $self->{'last_loadavg_update'});
if(time - $self->{'last_loadavg_update'} > 10) {
::debug("load", time - $self->{'last_loadavg_update'}, " secs old: ",
$self->{'loadavg_file'});
$update_loadavg_file = 1;
}
} else {
::debug("load", "No loadavg file: ", $self->{'loadavg_file'});
$self->{'loadavg'} = undef;
$update_loadavg_file = 1;
}
if($update_loadavg_file) {
::debug("load", "Updating loadavg file", $self->{'loadavg_file'}, "\n");
$self->{'last_loadavg_update'} = time;
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
my $cmd = "";
if($self->{'string'} ne ":") {
$cmd = $self->sshcommand() . " " . $self->serverlogin() . " ";
}
$cmd .= "ps ax -o state,command";
my $file = $self->{'loadavg_file'};
my ($dummy_fh, $tmpfile) = ::tmpfile(SUFFIX => ".loa");
qx{ ($cmd > $tmpfile && mv $tmpfile $file || rm $tmpfile) & };
}
return $self->{'loadavg'};
}
sub max_loadavg {
my $self = shift;
if($Global::max_load_file) {
my $mtime = (stat($Global::max_load_file))[9];
if($mtime > $Global::max_load_file_last_mod) {
$Global::max_load_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_loadavg(undef);
}
}
}
if(not defined $self->{'max_loadavg'}) {
$self->{'max_loadavg'} =
$self->compute_max_loadavg($opt::load);
}
::debug("load", "max_loadavg: ", $self->string(), " ", $self->{'max_loadavg'});
return $self->{'max_loadavg'};
}
sub set_max_loadavg {
my $self = shift;
$self->{'max_loadavg'} = shift;
}
sub compute_max_loadavg {
my $self = shift;
my $loadspec = shift;
my $load;
if(defined $loadspec) {
if($loadspec =~ /^\+(\d+)$/) {
my $j = $1;
$load =
$self->ncpus() + $j;
} elsif ($loadspec =~ /^-(\d+)$/) {
my $j = $1;
$load =
$self->ncpus() - $j;
} elsif ($loadspec =~ /^(\d+)\%$/) {
my $j = $1;
$load =
$self->ncpus() * $j / 100;
} elsif ($loadspec =~ /^(\d+(\.\d+)?)$/) {
$load = $1;
} elsif (-f $loadspec) {
$Global::max_load_file = $loadspec;
$Global::max_load_file_last_mod = (stat($Global::max_load_file))[9];
if(open(my $in_fh, "<", $Global::max_load_file)) {
my $opt_load_file = join("",<$in_fh>);
close $in_fh;
$load = $self->compute_max_loadavg($opt_load_file);
} else {
print $Global::original_stderr "Cannot open $loadspec\n";
::wait_and_exit(255);
}
} else {
print $Global::original_stderr "Parsing of --load failed\n";
::die_usage();
}
if($load < 0.01) {
$load = 0.01;
}
}
return $load;
}
sub time_to_login {
my $self = shift;
return $self->{'time_to_login'};
}
sub set_time_to_login {
my $self = shift;
$self->{'time_to_login'} = shift;
}
sub max_jobs_running {
my $self = shift;
if(not defined $self->{'max_jobs_running'}) {
my $nproc = $self->compute_number_of_processes($opt::jobs);
$self->set_max_jobs_running($nproc);
}
return $self->{'max_jobs_running'};
}
sub orig_max_jobs_running {
my $self = shift;
return $self->{'orig_max_jobs_running'};
}
sub compute_number_of_processes {
my $self = shift;
my $opt_P = shift;
my $wanted_processes = $self->user_requested_processes($opt_P);
if(not defined $wanted_processes) {
$wanted_processes = $Global::default_simultaneous_sshlogins;
}
::debug("load", "Wanted procs: $wanted_processes\n");
my $system_limit =
$self->processes_available_by_system_limit($wanted_processes);
::debug("load", "Limited to procs: $system_limit\n");
return $system_limit;
}
sub processes_available_by_system_limit {
my $self = shift;
my $wanted_processes = shift;
my $system_limit = 0;
my @jobs = ();
my $job;
my @args = ();
my $arg;
my $more_filehandles = 1;
my $max_system_proc_reached = 0;
my $slow_spawining_warning_printed = 0;
my $time = time;
my %fh;
my @children;
for my $i (1..12) {
open($fh{"init-$i"}, "<", "/dev/null");
}
for(1..2) {
my $child;
if($child = fork()) {
push (@children,$child);
$Global::unkilled_children{$child} = 1;
} elsif(defined $child) {
$SIG{TERM} = $Global::original_sig{TERM};
sleep 10000000;
exit(0);
} else {
$max_system_proc_reached = 1;
}
}
my $count_jobs_already_read = $Global::JobQueue->next_seq();
my $wait_time_for_getting_args = 0;
my $start_time = time;
while(1) {
$system_limit >= $wanted_processes and last;
not $more_filehandles and last;
$max_system_proc_reached and last;
my $before_getting_arg = time;
if($Global::semaphore or $opt::pipe) {
} elsif(defined $opt::retries and $count_jobs_already_read) {
$count_jobs_already_read--;
} else {
if($opt::X or $opt::m) {
if($Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->empty()) {
if($Global::JobQueue->empty()) {
last;
} else {
($job) = $Global::JobQueue->get();
push(@jobs, $job);
}
} else {
($arg) = $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->get();
push(@args, $arg);
}
} else {
$Global::JobQueue->empty() and last;
($job) = $Global::JobQueue->get();
push(@jobs, $job);
}
}
$wait_time_for_getting_args += time - $before_getting_arg;
$system_limit++;
$more_filehandles = open($fh{$system_limit*10}, "<", "/dev/null")
&& open($fh{$system_limit*10+2}, "<", "/dev/null")
&& open($fh{$system_limit*10+3}, "<", "/dev/null")
&& open($fh{$system_limit*10+4}, "<", "/dev/null");
my $child;
if($child = fork()) {
push (@children,$child);
$Global::unkilled_children{$child} = 1;
} elsif(defined $child) {
$SIG{TERM} = $Global::original_sig{TERM};
sleep 10000000;
exit(0);
} else {
$max_system_proc_reached = 1;
}
my $forktime = time - $time - $wait_time_for_getting_args;
::debug("run", "Time to fork $system_limit procs: $wait_time_for_getting_args ",
$forktime,
" (processes so far: ", $system_limit,")\n");
if($system_limit > 10 and
$forktime > 1 and
$forktime > $system_limit * 0.01
and not $slow_spawining_warning_printed) {
print $Global::original_stderr
("parallel: Warning: Starting $system_limit processes took > $forktime sec.\n",
"Consider adjusting -j. Press CTRL-C to stop.\n");
$slow_spawining_warning_printed = 1;
}
}
for (values %fh) { close $_ }
for my $pid (@children) {
kill 9, $pid;
waitpid($pid,0);
delete $Global::unkilled_children{$pid};
}
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args);
$Global::JobQueue->unget(@jobs);
if($system_limit < $wanted_processes) {
if($system_limit < 1 and not $Global::JobQueue->empty()) {
::warning("Cannot spawn any jobs. Raising ulimit -u or /etc/security/limits.conf\n",
"or /proc/sys/kernel/pid_max may help.\n");
::wait_and_exit(255);
}
if(not $more_filehandles) {
::warning("Only enough file handles to run ", $system_limit, " jobs in parallel.\n",
"Running 'parallel -j0 -N", $system_limit, " --pipe parallel -j0' or ",
"raising ulimit -n or /etc/security/limits.conf may help.\n");
}
if($max_system_proc_reached) {
::warning("Only enough available processes to run ", $system_limit,
" jobs in parallel. Raising ulimit -u or /etc/security/limits.conf\n",
"or /proc/sys/kernel/pid_max may help.\n");
}
}
if($] == 5.008008 and $system_limit > 1000) {
$system_limit = 1000;
}
if($Global::JobQueue->empty()) {
$system_limit ||= 1;
}
if($self->string() ne ":" and
$system_limit > $Global::default_simultaneous_sshlogins) {
$system_limit =
$self->simultaneous_sshlogin_limit($system_limit);
}
return $system_limit;
}
sub simultaneous_sshlogin_limit {
my $self = shift;
my $wanted_processes = shift;
if($self->{'time_to_login'}) {
return $wanted_processes;
}
my $ssh_limit =
::min($self->simultaneous_sshlogin($wanted_processes),
$self->simultaneous_sshlogin($wanted_processes));
if($ssh_limit < $wanted_processes) {
my $serverlogin = $self->serverlogin();
::warning("ssh to $serverlogin only allows ",
"for $ssh_limit simultaneous logins.\n",
"You may raise this by changing ",
"/etc/ssh/sshd_config:MaxStartups and MaxSessions on $serverlogin.\n",
"Using only ",$ssh_limit-1," connections ",
"to avoid race conditions.\n");
}
if($ssh_limit > 1) { $ssh_limit -= 1; }
return $ssh_limit;
}
sub simultaneous_sshlogin {
my $self = shift;
my $wanted_processes = shift;
my $sshcmd = $self->sshcommand();
my $serverlogin = $self->serverlogin();
my $sshdelay = $opt::sshdelay ? "sleep $opt::sshdelay;" : "";
my $cmd = "$sshdelay$sshcmd $serverlogin echo simultaneouslogin </dev/null 2>&1 &"x$wanted_processes;
::debug("init", "Trying $wanted_processes logins at $serverlogin\n");
open (my $simul_fh, "-|", "($cmd)|grep simultaneouslogin | wc -l") or
::die_bug("simultaneouslogin");
my $ssh_limit = <$simul_fh>;
close $simul_fh;
chomp $ssh_limit;
return $ssh_limit;
}
sub set_ncpus {
my $self = shift;
$self->{'ncpus'} = shift;
}
sub user_requested_processes {
my $self = shift;
my $opt_P = shift;
my $processes;
if(defined $opt_P) {
if($opt_P =~ /^\+(\d+)$/) {
my $j = $1;
$processes =
$self->ncpus() + $j;
} elsif ($opt_P =~ /^-(\d+)$/) {
my $j = $1;
$processes =
$self->ncpus() - $j;
} elsif ($opt_P =~ /^(\d+(\.\d+)?)\%$/) {
my $j = $1;
$processes =
$self->ncpus() * $j / 100;
} elsif ($opt_P =~ /^(\d+)$/) {
$processes = $1;
if($processes == 0) {
$processes = $Global::infinity;
}
} elsif (-f $opt_P) {
$Global::max_procs_file = $opt_P;
$Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9];
if(open(my $in_fh, "<", $Global::max_procs_file)) {
my $opt_P_file = join("",<$in_fh>);
close $in_fh;
$processes = $self->user_requested_processes($opt_P_file);
} else {
::error("Cannot open $opt_P.\n");
::wait_and_exit(255);
}
} else {
::error("Parsing of --jobs/-j/--max-procs/-P failed.\n");
::die_usage();
}
$processes = ::ceil($processes);
}
return $processes;
}
sub ncpus {
my $self = shift;
if(not defined $self->{'ncpus'}) {
my $sshcmd = $self->sshcommand();
my $serverlogin = $self->serverlogin();
if($serverlogin eq ":") {
if($opt::use_cpus_instead_of_cores) {
$self->{'ncpus'} = no_of_cpus();
} else {
$self->{'ncpus'} = no_of_cores();
}
} else {
my $ncpu;
my $sqe = ::shell_quote_scalar($Global::envvar);
if($opt::use_cpus_instead_of_cores) {
$ncpu = qx(echo|$sshcmd $serverlogin $sqe parallel --number-of-cpus);
} else {
::debug("init",qq(echo|$sshcmd $serverlogin $sqe parallel --number-of-cores\n));
$ncpu = qx(echo|$sshcmd $serverlogin $sqe parallel --number-of-cores);
}
chomp $ncpu;
if($ncpu =~ /^\s*[0-9]+\s*$/s) {
$self->{'ncpus'} = $ncpu;
} else {
::warning("Could not figure out ",
"number of cpus on $serverlogin ($ncpu). Using 1.\n");
$self->{'ncpus'} = 1;
}
}
}
return $self->{'ncpus'};
}
sub no_of_cpus {
local $/="\n"; my $no_of_cpus;
if ($^O eq 'linux') {
$no_of_cpus = no_of_cpus_gnu_linux() || no_of_cores_gnu_linux();
} elsif ($^O eq 'freebsd') {
$no_of_cpus = no_of_cpus_freebsd();
} elsif ($^O eq 'netbsd') {
$no_of_cpus = no_of_cpus_netbsd();
} elsif ($^O eq 'openbsd') {
$no_of_cpus = no_of_cpus_openbsd();
} elsif ($^O eq 'gnu') {
$no_of_cpus = no_of_cpus_hurd();
} elsif ($^O eq 'darwin') {
$no_of_cpus = no_of_cpus_darwin();
} elsif ($^O eq 'solaris') {
$no_of_cpus = no_of_cpus_solaris();
} elsif ($^O eq 'aix') {
$no_of_cpus = no_of_cpus_aix();
} elsif ($^O eq 'hpux') {
$no_of_cpus = no_of_cpus_hpux();
} elsif ($^O eq 'nto') {
$no_of_cpus = no_of_cpus_qnx();
} elsif ($^O eq 'svr5') {
$no_of_cpus = no_of_cpus_openserver();
} elsif ($^O eq 'irix') {
$no_of_cpus = no_of_cpus_irix();
} elsif ($^O eq 'dec_osf') {
$no_of_cpus = no_of_cpus_tru64();
} else {
$no_of_cpus = (no_of_cpus_gnu_linux()
|| no_of_cpus_freebsd()
|| no_of_cpus_netbsd()
|| no_of_cpus_openbsd()
|| no_of_cpus_hurd()
|| no_of_cpus_darwin()
|| no_of_cpus_solaris()
|| no_of_cpus_aix()
|| no_of_cpus_hpux()
|| no_of_cpus_qnx()
|| no_of_cpus_openserver()
|| no_of_cpus_irix()
|| no_of_cpus_tru64()
|| nproc()
);
}
if($no_of_cpus) {
chomp $no_of_cpus;
return $no_of_cpus;
} else {
::warning("Cannot figure out number of cpus. Using 1.\n");
return 1;
}
}
sub no_of_cores {
local $/="\n"; my $no_of_cores;
if ($^O eq 'linux') {
$no_of_cores = no_of_cores_gnu_linux();
} elsif ($^O eq 'freebsd') {
$no_of_cores = no_of_cores_freebsd();
} elsif ($^O eq 'netbsd') {
$no_of_cores = no_of_cores_netbsd();
} elsif ($^O eq 'openbsd') {
$no_of_cores = no_of_cores_openbsd();
} elsif ($^O eq 'gnu') {
$no_of_cores = no_of_cores_hurd();
} elsif ($^O eq 'darwin') {
$no_of_cores = no_of_cores_darwin();
} elsif ($^O eq 'solaris') {
$no_of_cores = no_of_cores_solaris();
} elsif ($^O eq 'aix') {
$no_of_cores = no_of_cores_aix();
} elsif ($^O eq 'hpux') {
$no_of_cores = no_of_cores_hpux();
} elsif ($^O eq 'nto') {
$no_of_cores = no_of_cores_qnx();
} elsif ($^O eq 'svr5') {
$no_of_cores = no_of_cores_openserver();
} elsif ($^O eq 'irix') {
$no_of_cores = no_of_cores_irix();
} elsif ($^O eq 'dec_osf') {
$no_of_cores = no_of_cores_tru64();
} else {
$no_of_cores = (no_of_cores_gnu_linux()
|| no_of_cores_freebsd()
|| no_of_cores_netbsd()
|| no_of_cores_openbsd()
|| no_of_cores_hurd()
|| no_of_cores_darwin()
|| no_of_cores_solaris()
|| no_of_cores_aix()
|| no_of_cores_hpux()
|| no_of_cores_qnx()
|| no_of_cores_openserver()
|| no_of_cores_irix()
|| no_of_cores_tru64()
|| nproc()
);
}
if($no_of_cores) {
chomp $no_of_cores;
return $no_of_cores;
} else {
::warning("Cannot figure out number of CPU cores. Using 1.\n");
return 1;
}
}
sub nproc {
my $no_of_cores = `nproc 2>/dev/null`;
return $no_of_cores;
}
sub no_of_cpus_gnu_linux {
my $no_of_cpus;
my $no_of_cores;
if(-e "/proc/cpuinfo") {
$no_of_cpus = 0;
$no_of_cores = 0;
my %seen;
open(my $in_fh, "<", "/proc/cpuinfo") || return undef;
while(<$in_fh>) {
if(/^physical id.*[:](.*)/ and not $seen{$1}++) {
$no_of_cpus++;
}
/^processor.*[:]/i and $no_of_cores++;
}
close $in_fh;
}
return ($no_of_cpus||$no_of_cores);
}
sub no_of_cores_gnu_linux {
my $no_of_cores;
if(-e "/proc/cpuinfo") {
$no_of_cores = 0;
open(my $in_fh, "<", "/proc/cpuinfo") || return undef;
while(<$in_fh>) {
/^processor.*[:]/i and $no_of_cores++;
}
close $in_fh;
}
return $no_of_cores;
}
sub no_of_cpus_freebsd {
my $no_of_cpus =
(`sysctl -a dev.cpu 2>/dev/null | grep \%parent | awk '{ print \$2 }' | uniq | wc -l | awk '{ print \$1 }'`
or
`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`);
chomp $no_of_cpus;
return $no_of_cpus;
}
sub no_of_cores_freebsd {
my $no_of_cores =
(`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`
or
`sysctl -a hw 2>/dev/null | grep [^a-z]logicalcpu[^a-z] | awk '{ print \$2 }'`);
chomp $no_of_cores;
return $no_of_cores;
}
sub no_of_cpus_netbsd {
my $no_of_cpus = `sysctl -n hw.ncpu 2>/dev/null`;
chomp $no_of_cpus;
return $no_of_cpus;
}
sub no_of_cores_netbsd {
my $no_of_cores = `sysctl -n hw.ncpu 2>/dev/null`;
chomp $no_of_cores;
return $no_of_cores;
}
sub no_of_cpus_openbsd {
my $no_of_cpus = `sysctl -n hw.ncpu 2>/dev/null`;
chomp $no_of_cpus;
return $no_of_cpus;
}
sub no_of_cores_openbsd {
my $no_of_cores = `sysctl -n hw.ncpu 2>/dev/null`;
chomp $no_of_cores;
return $no_of_cores;
}
sub no_of_cpus_hurd {
my $no_of_cpus = `nproc`;
chomp $no_of_cpus;
return $no_of_cpus;
}
sub no_of_cores_hurd {
my $no_of_cores = `nproc`;
chomp $no_of_cores;
return $no_of_cores;
}
sub no_of_cpus_darwin {
my $no_of_cpus =
(`sysctl -n hw.physicalcpu 2>/dev/null`
or
`sysctl -a hw 2>/dev/null | grep [^a-z]physicalcpu[^a-z] | awk '{ print \$2 }'`);
return $no_of_cpus;
}
sub no_of_cores_darwin {
my $no_of_cores =
(`sysctl -n hw.logicalcpu 2>/dev/null`
or
`sysctl -a hw 2>/dev/null | grep [^a-z]logicalcpu[^a-z] | awk '{ print \$2 }'`);
return $no_of_cores;
}
sub no_of_cpus_solaris {
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
sub no_of_cores_solaris {
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
sub no_of_cpus_aix {
my $no_of_cpus = 0;
if(-x "/usr/sbin/lscfg") {
open(my $in_fh, "-|", "/usr/sbin/lscfg -vs |grep proc | wc -l|tr -d ' '")
|| return undef;
$no_of_cpus = <$in_fh>;
chomp ($no_of_cpus);
close $in_fh;
}
return $no_of_cpus;
}
sub no_of_cores_aix {
my $no_of_cores;
if(-x "/usr/bin/vmstat") {
open(my $in_fh, "-|", "/usr/bin/vmstat 1 1") || return undef;
while(<$in_fh>) {
/lcpu=([0-9]*) / and $no_of_cores = $1;
}
close $in_fh;
}
return $no_of_cores;
}
sub no_of_cpus_hpux {
my $no_of_cpus =
(`/usr/bin/mpsched -s 2>&1 | grep 'Locality Domain Count' | awk '{ print \$4 }'`);
return $no_of_cpus;
}
sub no_of_cores_hpux {
my $no_of_cores =
(`/usr/bin/mpsched -s 2>&1 | grep 'Processor Count' | awk '{ print \$3 }'`);
return $no_of_cores;
}
sub no_of_cpus_qnx {
my $no_of_cpus = 0;
return $no_of_cpus;
}
sub no_of_cores_qnx {
my $no_of_cores = 0;
return $no_of_cores;
}
sub no_of_cpus_openserver {
my $no_of_cpus = 0;
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
return $no_of_cpus;
}
sub no_of_cores_openserver {
my $no_of_cores = 0;
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
return $no_of_cores;
}
sub no_of_cpus_irix {
my $no_of_cpus = `hinv | grep HZ | grep Processor | awk '{print \$1}'`;
return $no_of_cpus;
}
sub no_of_cores_irix {
my $no_of_cores = `hinv | grep HZ | grep Processor | awk '{print \$1}'`;
return $no_of_cores;
}
sub no_of_cpus_tru64 {
my $no_of_cpus = `sizer -pr`;
return $no_of_cpus;
}
sub no_of_cores_tru64 {
my $no_of_cores = `sizer -pr`;
return $no_of_cores;
}
sub sshcommand {
my $self = shift;
if (not defined $self->{'sshcommand'}) {
$self->sshcommand_of_sshlogin();
}
return $self->{'sshcommand'};
}
sub serverlogin {
my $self = shift;
if (not defined $self->{'serverlogin'}) {
$self->sshcommand_of_sshlogin();
}
return $self->{'serverlogin'};
}
sub sshcommand_of_sshlogin {
my $self = shift;
my ($sshcmd, $serverlogin);
if($self->{'string'} =~ /(.+) (\S+)$/) {
$sshcmd = $1; $serverlogin = $2;
} else {
if($opt::controlmaster) {
my $control_path = $self->control_path_dir()."/ssh-%r@%h:%p";
$sshcmd = "ssh -S ".$control_path;
$serverlogin = $self->{'string'};
if(not $self->{'control_path'}{$control_path}++) {
my $pid = fork();
if($pid) {
$Global::sshmaster{$pid} ||= 1;
} else {
$SIG{'TERM'} = undef;
open(STDOUT,">","/dev/null");
open(STDERR,">","/dev/null");
open(STDIN,"<","/dev/null");
my $sleep = ::shell_quote_scalar('$|=1;while(1){sleep 1;print "foo\n"}');
my @master = ("ssh", "-tt", "-MTS", $control_path, $serverlogin, "perl", "-e", $sleep);
exec(@master);
}
}
} else {
$sshcmd = "ssh"; $serverlogin = $self->{'string'};
}
}
$self->{'sshcommand'} = $sshcmd;
$self->{'serverlogin'} = $serverlogin;
}
sub control_path_dir {
my $self = shift;
if(not defined $self->{'control_path_dir'}) {
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
$self->{'control_path_dir'} =
File::Temp::tempdir($ENV{'HOME'}
. "/.parallel/tmp/control_path_dir-XXXX",
CLEANUP => 1);
}
return $self->{'control_path_dir'};
}
sub rsync_transfer_cmd {
my $self = shift;
my $file = shift;
my $workdir = shift;
if(not -r $file) {
::warning($file, " is not readable and will not be transferred.\n");
return "true";
}
my $rsync_destdir;
if($file =~ m:^/:) {
$rsync_destdir = "/";
} else {
$rsync_destdir = ::shell_quote_file($workdir);
}
$file = ::shell_quote_file($file);
my $sshcmd = $self->sshcommand();
my $rsync_opt = "-rlDzR -e" . ::shell_quote_scalar($sshcmd);
my $serverlogin = $self->serverlogin();
return "( $sshcmd $serverlogin mkdir -p $rsync_destdir;" .
rsync()." $rsync_opt $file $serverlogin:$rsync_destdir )";
}
sub cleanup_cmd {
my $self = shift;
my $file = shift;
my $workdir = shift;
my $f = $file;
if($f =~ m:/\./:) {
$f =~ s:.*/\./:$workdir/:;
} elsif($f =~ m:^[^/]:) {
$f = $workdir."/".$f;
}
my @subdirs = split m:/:, ::dirname($f);
my @rmdir;
my $dir = "";
for(@subdirs) {
$dir .= $_."/";
unshift @rmdir, ::shell_quote_file($dir);
}
my $rmdir = @rmdir ? "rmdir @rmdir 2>/dev/null;" : "";
if(defined $opt::workdir and $opt::workdir eq "...") {
$rmdir .= "rm -rf " . ::shell_quote_file($workdir).';';
}
$f = ::shell_quote_file($f);
my $sshcmd = $self->sshcommand();
my $serverlogin = $self->serverlogin();
return "$sshcmd $serverlogin ".::shell_quote_scalar("(rm -f $f; $rmdir)");
}
{
my $rsync;
sub rsync {
if(not $rsync) {
my @out = `rsync --version`;
for (@out) {
if(/version (\d+.\d+)(.\d+)?/) {
if($1 >= 3.1) {
$rsync = "rsync --protocol 30";
} else {
$rsync = "rsync";
}
}
}
$rsync or ::die_bug("Cannot figure out version of rsync: @out");
}
return $rsync;
}
}
package JobQueue;
sub new {
my $class = shift;
my $commandref = shift;
my $read_from = shift;
my $context_replace = shift;
my $max_number_of_args = shift;
my $return_files = shift;
my $commandlinequeue = CommandLineQueue->new
($commandref, $read_from, $context_replace, $max_number_of_args,
$return_files);
my @unget = ();
return bless {
'unget' => \@unget,
'commandlinequeue' => $commandlinequeue,
'total_jobs' => undef,
}, ref($class) || $class;
}
sub get {
my $self = shift;
if(@{$self->{'unget'}}) {
my $job = shift @{$self->{'unget'}};
return ($job);
} else {
my $commandline = $self->{'commandlinequeue'}->get();
if(defined $commandline) {
my $job = Job->new($commandline);
return $job;
} else {
return undef;
}
}
}
sub unget {
my $self = shift;
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = (not @{$self->{'unget'}})
&& $self->{'commandlinequeue'}->empty();
::debug("run", "JobQueue->empty $empty ");
return $empty;
}
sub total_jobs {
my $self = shift;
if(not defined $self->{'total_jobs'}) {
my $job;
my @queue;
my $start = time;
while($job = $self->get()) {
if(time - $start > 10) {
::warning("Reading all arguments takes longer than 10 seconds.\n");
$opt::eta && ::warning("Consider removing --eta.\n");
$opt::bar && ::warning("Consider removing --bar.\n");
last;
}
push @queue, $job;
}
while($job = $self->get()) {
push @queue, $job;
}
$self->unget(@queue);
$self->{'total_jobs'} = $#queue+1;
}
return $self->{'total_jobs'};
}
sub next_seq {
my $self = shift;
return $self->{'commandlinequeue'}->seq();
}
sub quote_args {
my $self = shift;
return $self->{'commandlinequeue'}->quote_args();
}
package Job;
sub new {
my $class = shift;
my $commandlineref = shift;
return bless {
'commandline' => $commandlineref, 'workdir' => undef, 'stdin' => undef, 'remaining' => "", 'datawritten' => 0, 'transfersize' => 0, 'returnsize' => 0, 'pid' => undef,
'failed' => undef,
'sshlogin' => undef,
'sshlogin_wrap' => undef,
'exitstatus' => undef,
'exitsignal' => undef,
'timeout' => undef,
'virgin' => 1,
}, ref($class) || $class;
}
sub replaced {
my $self = shift;
$self->{'commandline'} or ::die_bug("commandline empty");
return $self->{'commandline'}->replaced();
}
sub seq {
my $self = shift;
return $self->{'commandline'}->seq();
}
sub slot {
my $self = shift;
return $self->{'commandline'}->slot();
}
{
my($cattail);
sub cattail {
if(not $cattail) {
$cattail = q{
# cat followed by tail.
# If $writerpid dead: finish after this round
use Fcntl;
$|=1;
my ($cmd, $writerpid, $read_file, $unlink_file) = @ARGV;
if($read_file) {
open(IN,"<",$read_file) || die("cattail: Cannot open $read_file");
} else {
*IN = *STDIN;
}
my $flags;
fcntl(IN, F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
$flags |= O_NONBLOCK; # Add non-blocking to the flags
fcntl(IN, F_SETFL, $flags) || die $!; # Set the flags on the filehandle
open(OUT,"|-",$cmd) || die("cattail: Cannot run $cmd");
while(1) {
# clear EOF
seek(IN,0,1);
my $writer_running = kill 0, $writerpid;
$read = sysread(IN,$buf,32768);
if($read) {
# We can unlink the file now: The writer has written something
-e $unlink_file and unlink $unlink_file;
# Blocking print
while($buf) {
my $bytes_written = syswrite(OUT,$buf);
# syswrite may be interrupted by SIGHUP
substr($buf,0,$bytes_written) = "";
}
# Something printed: Wait less next time
$sleep /= 2;
} else {
if(eof(IN) and not $writer_running) {
# Writer dead: There will never be more to read => exit
exit;
}
# TODO This could probably be done more efficiently using select(2)
# Nothing read: Wait longer before next read
# Up to 30 milliseconds
$sleep = ($sleep < 30) ? ($sleep * 1.001 + 0.01) : ($sleep);
usleep($sleep);
}
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
select(undef, undef, undef, $secs/1000);
}
};
$cattail =~ s/#.*//mg;
$cattail =~ s/\s+/ /g;
}
return $cattail;
}
}
sub openoutputfiles {
my $self = shift;
my ($outfhw, $errfhw, $outname, $errname);
if($opt::results) {
my $args_as_dirname = $self->{'commandline'}->args_as_dirname();
my $dir = $opt::results."/".$args_as_dirname;
if(eval{ File::Path::mkpath($dir); }) {
} else {
max_file_name_length($opt::results);
$args_as_dirname = $self->{'commandline'}->args_as_dirname();
$dir = $opt::results."/".$args_as_dirname;
File::Path::mkpath($dir);
}
$outname = "$dir/stdout";
if(not open($outfhw, "+>", $outname)) {
::error("Cannot write to `$outname'.\n");
::wait_and_exit(255);
}
$errname = "$dir/stderr";
if(not open($errfhw, "+>", $errname)) {
::error("Cannot write to `$errname'.\n");
::wait_and_exit(255);
}
$self->set_fh(1,"unlink","");
$self->set_fh(2,"unlink","");
} elsif(not $opt::ungroup) {
if(@Global::tee_jobs) {
} elsif($opt::files) {
($outfhw, $outname) = ::tmpfile(SUFFIX => ".par");
($errfhw, $errname) = ::tmpfile(SUFFIX => ".par");
$self->set_fh(1,"unlink","");
$self->set_fh(2,"unlink",$errname);
} else {
($outfhw, $outname) = ::tmpfile(SUFFIX => ".par");
($errfhw, $errname) = ::tmpfile(SUFFIX => ".par");
$self->set_fh(1,"unlink",$outname);
$self->set_fh(2,"unlink",$errname);
}
} else {
open($outfhw,">&",$Global::fd{1}) || die;
open($errfhw,">&",$Global::fd{2}) || die;
$outname = "";
$errname = "";
$self->set_fh(1,"unlink",$outname);
$self->set_fh(2,"unlink",$errname);
}
$self->set_fh(1,'w',$outfhw);
$self->set_fh(2,'w',$errfhw);
$self->set_fh(1,'name',$outname);
$self->set_fh(2,'name',$errname);
if($opt::compress) {
my $cattail = cattail();
for my $fdno (1,2) {
my $wpid = open(my $fdw,"|-","$opt::compress_program >>".
$self->fh($fdno,'name')) || die $?;
$self->set_fh($fdno,'w',$fdw);
$self->set_fh($fdno,'wpid',$wpid);
my $rpid = open(my $fdr, "-|", "perl", "-e", $cattail,
$opt::decompress_program, $wpid,
$self->fh($fdno,'name'),$self->fh($fdno,'unlink')) || die $?;
$self->set_fh($fdno,'r',$fdr);
$self->set_fh($fdno,'rpid',$rpid);
}
} elsif(not $opt::ungroup) {
for my $fdno (1,2) {
open(my $fdr,"<", $self->fh($fdno,'name')) ||
::die_bug("fdr: Cannot open ".$self->fh($fdno,'name'));
$self->set_fh($fdno,'r',$fdr);
$Global::debug or unlink $self->fh($fdno,"unlink");
}
}
if($opt::linebuffer) {
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
for my $fdno (1,2) {
my $fdr = $self->fh($fdno,'r');
my $flags;
fcntl($fdr, &F_GETFL, $flags) || die $!; $flags |= &O_NONBLOCK; fcntl($fdr, &F_SETFL, $flags) || die $!; }
}
}
sub max_file_name_length {
my $testdir = shift;
my $upper = 8_000_000;
my $len = 8;
my $dir="x"x$len;
do {
rmdir($testdir."/".$dir);
$len *= 16;
$dir="x"x$len;
} while (mkdir $testdir."/".$dir);
my $min = $len/16;
my $max = $len;
while($max-$min > 5) {
my $test = int(($min+$max)/2);
$dir="x"x$test;
if(mkdir $testdir."/".$dir) {
rmdir($testdir."/".$dir);
$min = $test;
} else {
$max = $test;
}
}
$Global::max_file_length = $min;
return $min;
}
sub set_fh {
my ($self, $fd_no, $key, $fh) = @_;
$self->{'fd'}{$fd_no,$key} = $fh;
}
sub fh {
my ($self, $fd_no, $key) = @_;
return $self->{'fd'}{$fd_no,$key};
}
sub write {
my $self = shift;
my $remaining_ref = shift;
my $stdin_fh = $self->fh(0,"w");
syswrite($stdin_fh,$$remaining_ref);
}
sub set_stdin_buffer {
my $self = shift;
my ($header_ref,$block_ref,$endpos,$recstart,$recend) = @_;
$self->{'stdin_buffer'} = ($self->virgin() ? $$header_ref : "").substr($$block_ref,0,$endpos);
if($opt::remove_rec_sep) {
remove_rec_sep(\$self->{'stdin_buffer'},$recstart,$recend);
}
$self->{'stdin_buffer_length'} = length $self->{'stdin_buffer'};
$self->{'stdin_buffer_pos'} = 0;
}
sub stdin_buffer_length {
my $self = shift;
return $self->{'stdin_buffer_length'};
}
sub remove_rec_sep {
my ($block_ref,$recstart,$recend) = @_;
$$block_ref =~ s/$recend$recstart//gos;
$$block_ref =~ s/^$recstart//os;
$$block_ref =~ s/$recend$//os;
}
sub non_block_write {
my $self = shift;
my $something_written = 0;
use POSIX qw(:errno_h);
for my $buf (substr($self->{'stdin_buffer'},$self->{'stdin_buffer_pos'})) {
my $in = $self->fh(0,"w");
my $rv = syswrite($in, $buf);
if (!defined($rv) && $! == EAGAIN) {
$something_written = 0;
} elsif ($self->{'stdin_buffer_pos'}+$rv != $self->{'stdin_buffer_length'}) {
$self->{'stdin_buffer_pos'} += $rv;
$something_written = $rv;
} else {
my $a="";
$self->set_stdin_buffer(\$a,\$a,"","");
$something_written = $rv;
}
}
::debug("pipe", "Non-block: ", $something_written);
return $something_written;
}
sub virgin {
my $self = shift;
return $self->{'virgin'};
}
sub set_virgin {
my $self = shift;
$self->{'virgin'} = shift;
}
sub pid {
my $self = shift;
return $self->{'pid'};
}
sub set_pid {
my $self = shift;
$self->{'pid'} = shift;
}
sub starttime {
my $self = shift;
return sprintf("%.3f",$self->{'starttime'});
}
sub set_starttime {
my $self = shift;
my $starttime = shift || ::now();
$self->{'starttime'} = $starttime;
}
sub runtime {
my $self = shift;
return sprintf("%.3f",int(($self->endtime() - $self->starttime())*1000)/1000);
}
sub endtime {
my $self = shift;
return ($self->{'endtime'} || 0);
}
sub set_endtime {
my $self = shift;
my $endtime = shift;
$self->{'endtime'} = $endtime;
}
sub timedout {
my $self = shift;
my $delta_time = shift;
return time > $self->{'starttime'} + $delta_time;
}
sub kill {
my $self = shift;
my @signals = @_;
my @family_pids = $self->family_pids();
$self->set_exitstatus(-1);
::debug("run", "Kill seq ", $self->seq(), "\n");
my @send_signals = @signals || ("TERM", "TERM", "KILL");
for my $signal (@send_signals) {
my $alive = 0;
for my $pid (@family_pids) {
if(kill 0, $pid) {
kill $signal, $pid;
$alive = 1;
}
}
@signals and next;
if($signal eq "TERM" and $alive) {
my $sleep = 1;
for (my $sleepsum = 0; kill 0, $family_pids[0] and $sleepsum < 200;
$sleepsum += $sleep) {
$sleep = ::reap_usleep($sleep);
}
}
}
}
sub family_pids {
my $self = shift;
my $pid = $self->pid();
my @pids;
my ($children_of_ref, $parent_of_ref, $name_of_ref) = ::pid_table();
my @more = ($pid);
while(@more) {
my @m;
push @pids, @more;
for my $parent (@more) {
if($children_of_ref->{$parent}) {
push @m, @{$children_of_ref->{$parent}};
}
}
@more = @m;
}
return (@pids);
}
sub failed {
my $self = shift;
my $sshlogin = shift;
return $self->{'failed'}{$sshlogin};
}
sub failed_here {
my $self = shift;
return $self->{'failed'}{$self->sshlogin()};
}
sub add_failed {
my $self = shift;
my $sshlogin = shift;
$self->{'failed'}{$sshlogin}++;
}
sub add_failed_here {
my $self = shift;
$self->{'failed'}{$self->sshlogin()}++;
}
sub reset_failed {
my $self = shift;
my $sshlogin = shift;
delete $self->{'failed'}{$sshlogin};
}
sub reset_failed_here {
my $self = shift;
delete $self->{'failed'}{$self->sshlogin()};
}
sub min_failed {
my $self = shift;
my $min_failures =
::min(map { $self->{'failed'}{$_} } keys %{$self->{'failed'}});
my $number_of_sshlogins_failed_on = scalar keys %{$self->{'failed'}};
return ($number_of_sshlogins_failed_on,$min_failures);
}
sub total_failed {
my $self = shift;
my $total_failures = 0;
for (values %{$self->{'failed'}}) {
$total_failures += $_;
}
return $total_failures;
}
sub wrapped {
my $self = shift;
if(not defined $self->{'wrapped'}) {
my $command = $Global::envvar.$self->replaced();
if($opt::shellquote) {
$command = "echo " .
::shell_quote_scalar(::shell_quote_scalar($command));
}
if($opt::nice) {
$command = '\nice'. " -n". $opt::nice. " ".
$Global::shell. " -c ".
::shell_quote_scalar($command);
}
if($opt::cat) {
$command =
$self->{'commandline'}->replace_placeholders(["cat > \257<\257>; "], 0, 0).
$command.
$self->{'commandline'}->replace_placeholders(
["; _EXIT=\$?; rm \257<\257>; exit \$_EXIT"], 0, 0);
} elsif($opt::fifo) {
$command =
$self->{'commandline'}->replace_placeholders(["mkfifo \257<\257>; ("], 0, 0).
$command.
$self->{'commandline'}->replace_placeholders([") & _PID=\$!; cat > \257<\257>; ",
"wait \$_PID; _EXIT=\$?; ",
"rm \257<\257>; exit \$_EXIT"],
0,0);
}
$command = $self->sshlogin_wrap($command);
if(@Global::cat_partials) {
$command = (shift @Global::cat_partials). "|". "(". $command. ")";
} elsif($opt::pipe) {
my ($dummy_fh, $tmpfile) = ::tmpfile(SUFFIX => ".chr");
unlink $tmpfile;
$command =
qq{ sh -c 'dd bs=1 count=1 of=$tmpfile 2>/dev/null'; }.
qq{ test \! -s "$tmpfile" && rm -f "$tmpfile" && exec true; }.
qq{ (cat $tmpfile; rm $tmpfile; cat - ) | }.
"($command);";
}
if($opt::tmux) {
$command = $self->tmux_wrap($command);
}
$self->{'wrapped'} = $command;
}
return $self->{'wrapped'};
}
sub set_sshlogin {
my $self = shift;
my $sshlogin = shift;
$self->{'sshlogin'} = $sshlogin;
delete $self->{'sshlogin_wrap'}; delete $self->{'wrapped'};
}
sub sshlogin {
my $self = shift;
return $self->{'sshlogin'};
}
sub sshlogin_wrap {
my $self = shift;
my $command = shift;
if(not defined $self->{'sshlogin_wrap'}) {
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my ($pre,$post,$cleanup)=("","","");
if($serverlogin eq ":") {
$self->{'sshlogin_wrap'} = $command;
} else {
$pre .= $self->sshtransfer();
$post .= $self->sshreturn();
$post .= $self->sshcleanup();
if($post) {
$post = '_EXIT_status=$?; ' . $post . ' exit $_EXIT_status;';
}
my $parallel_env =
($Global::envwarn
. q{ 'eval `echo $SHELL | grep "/t\\{0,1\\}csh" > /dev/null }
. q{ && echo setenv PARALLEL_SEQ '$PARALLEL_SEQ'\; }
. q{ setenv PARALLEL_PID '$PARALLEL_PID' }
. q{ || echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; }
. q{ PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' });
my $remote_pre = "";
my $ssh_options = "";
if(($opt::pipe or $opt::pipepart) and $opt::ctrlc
or
not ($opt::pipe or $opt::pipepart) and not $opt::noctrlc) {
$ssh_options = "-tt -oLogLevel=quiet";
$remote_pre .= ::shell_quote_scalar('tty >/dev/null && stty isig -onlcr -echo;');
}
if($opt::workdir) {
my $wd = ::shell_quote_file($self->workdir());
$remote_pre .= ::shell_quote_scalar("mkdir -p ") . $wd .
::shell_quote_scalar("; cd ") . $wd .
::shell_quote_scalar(qq{ || exec false;});
}
my $signal_script = "perl -e '".
q{
use IO::Poll;
$SIG{CHLD} = sub { $done = 1 };
$p = IO::Poll->new;
$p->mask(STDOUT, POLLHUP);
$pid=fork; unless($pid) {setpgrp; exec $ENV{SHELL}, "-c", @ARGV; die "exec: $!\n"}
$p->poll;
kill SIGHUP, -${pid} unless $done;
wait; exit ($?&127 ? 128+($?&127) : 1+$?>>8)
} . "' ";
$signal_script =~ s/\s+/ /g;
$self->{'sshlogin_wrap'} =
($pre
. "$sshcmd $ssh_options $serverlogin $parallel_env "
. $remote_pre
. ::shell_quote_scalar($command)
. ";"
. $post);
}
}
return $self->{'sshlogin_wrap'};
}
sub transfer {
my $self = shift;
my @transfer = ();
$self->{'transfersize'} = 0;
if($opt::transfer) {
for my $record (@{$self->{'commandline'}{'arg_list'}}) {
for my $arg (@$record) {
CORE::push @transfer, $arg->orig();
if(-e $arg->orig()) {
$self->{'transfersize'} += (stat($arg->orig()))[7];
}
}
}
}
return @transfer;
}
sub transfersize {
my $self = shift;
return $self->{'transfersize'};
}
sub sshtransfer {
my $self = shift;
my @pre;
my $sshlogin = $self->sshlogin();
my $workdir = $self->workdir();
for my $file ($self->transfer()) {
push @pre, $sshlogin->rsync_transfer_cmd($file,$workdir).";";
}
return join("",@pre);
}
sub return {
my $self = shift;
return $self->{'commandline'}->
replace_placeholders($self->{'commandline'}{'return_files'},0,0);
}
sub returnsize {
my $self = shift;
for my $file ($self->return()) {
if(-e $file) {
$self->{'returnsize'} += (stat($file))[7];
}
}
return $self->{'returnsize'};
}
sub sshreturn {
my $self = shift;
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $rsync_opt = "-rlDzR -e".::shell_quote_scalar($sshcmd);
my $pre = "";
for my $file ($self->return()) {
$file =~ s:^\./::g; my $relpath = ($file !~ m:^/:); my $cd = "";
my $wd = "";
if($relpath) {
$wd = ::shell_quote_file($self->workdir()."/");
}
$Global::use{"File::Basename"} ||= eval "use File::Basename; 1;";
$file =~ m:(.*)/\./:;
my $basedir = $1 ? ::shell_quote_file($1."/") : "";
my $nobasedir = $file;
$nobasedir =~ s:.*/\./::;
$cd = ::shell_quote_file(::dirname($nobasedir));
my $rsync_cd = '--rsync-path='.::shell_quote_scalar("cd $wd$cd; rsync");
my $basename = ::shell_quote_scalar(::shell_quote_file(basename($file)));
$pre .= "mkdir -p $basedir$cd; ".$sshlogin->rsync()." $rsync_cd $rsync_opt $serverlogin:".
$basename . " ".$basedir.$cd.";";
}
return $pre;
}
sub sshcleanup {
my $self = shift;
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $workdir = $self->workdir();
my $cleancmd = "";
for my $file ($self->cleanup()) {
my @subworkdirs = parentdirs_of($file);
$cleancmd .= $sshlogin->cleanup_cmd($file,$workdir).";";
}
if(defined $opt::workdir and $opt::workdir eq "...") {
$cleancmd .= "$sshcmd $serverlogin rm -rf " . ::shell_quote_scalar($workdir).';';
}
return $cleancmd;
}
sub cleanup {
my $self = shift;
if($opt::cleanup) {
my @transfer = $self->transfer();
my @return = $self->return();
return (@transfer,@return);
} else {
return ();
}
}
sub workdir {
my $self = shift;
if(not defined $self->{'workdir'}) {
my $workdir;
if(defined $opt::workdir) {
if($opt::workdir eq ".") {
my $home = $ENV{'HOME'};
eval 'use Cwd';
my $cwd = cwd();
$workdir = $cwd;
if($home) {
my ($home_dev, $home_ino) = (stat($home))[0,1];
my $parent = "";
my @dir_parts = split(m:/:,$cwd);
my $part;
while(defined ($part = shift @dir_parts)) {
$part eq "" and next;
$parent .= "/".$part;
my ($parent_dev, $parent_ino) = (stat($parent))[0,1];
if($parent_dev == $home_dev and $parent_ino == $home_ino) {
$workdir = join("/",@dir_parts);
last;
}
}
}
if($workdir eq "") {
$workdir = ".";
}
} elsif($opt::workdir eq "...") {
$workdir = ".parallel/tmp/" . ::hostname() . "-" . $$
. "-" . $self->seq();
} else {
$workdir = $opt::workdir;
$workdir =~ s:/\./:/:g; $workdir =~ s:/+$::; $workdir =~ s:^\./::g; }
} else {
$workdir = ".";
}
$self->{'workdir'} = ::shell_quote_scalar($workdir);
}
return $self->{'workdir'};
}
sub parentdirs_of {
my $d = shift;
my @parents = ();
while($d =~ s:/[^/]+$::) {
if($d ne ".") {
push @parents, $d;
}
}
return @parents;
}
sub start {
my $job = shift;
my $command = $job->wrapped();
if($Global::interactive or $Global::stderr_verbose) {
if($Global::interactive) {
print $Global::original_stderr "$command ?...";
open(my $tty_fh, "<", "/dev/tty") || ::die_bug("interactive-tty");
my $answer = <$tty_fh>;
close $tty_fh;
my $run_yes = ($answer =~ /^\s*y/i);
if (not $run_yes) {
$command = "true"; }
} else {
print $Global::original_stderr "$command\n";
}
}
my $pid;
$job->openoutputfiles();
my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w"));
local (*IN,*OUT,*ERR);
open OUT, '>&', $stdout_fh or ::die_bug("Can't redirect STDOUT: $!");
open ERR, '>&', $stderr_fh or ::die_bug("Can't dup STDOUT: $!");
if(($opt::dryrun or $Global::verbose) and $opt::ungroup) {
if($Global::verbose <= 1) {
print $stdout_fh $job->replaced(),"\n";
} else {
print $stdout_fh $command,"\n";
}
}
if($opt::dryrun) {
$command = "true";
}
$ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$;
::debug("run", $Global::total_running, " processes . Starting (",
$job->seq(), "): $command\n");
if($opt::pipe) {
my ($stdin_fh);
eval {
$pid = ::open3($stdin_fh, ">&OUT", ">&ERR", $Global::shell, "-c", $command) ||
::die_bug("open3-pipe");
1;
};
$job->set_fh(0,"w",$stdin_fh);
} elsif(@opt::a and not $Global::stdin_in_opt_a and $job->seq() == 1
and $job->sshlogin()->string() eq ":") {
*IN = *STDIN;
eval {
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $Global::shell, "-c", $command) ||
::die_bug("open3-a");
1;
};
open(STDIN, "<&", $Global::original_stdin)
or ::die_bug("dup-\$Global::original_stdin: $!");
} elsif ($opt::tty and not $Global::tty_taken and -c "/dev/tty" and
open(my $devtty_fh, "<", "/dev/tty")) {
*IN = $devtty_fh;
eval {
$pid = ::open3("<&IN", ">&OUT", ">&ERR", $Global::shell, "-c", $command) ||
::die_bug("open3-/dev/tty");
$Global::tty_taken = $pid;
close $devtty_fh;
1;
};
} else {
eval {
$pid = ::open3(::gensym, ">&OUT", ">&ERR", $Global::shell, "-c", $command) ||
::die_bug("open3-gensym");
1;
};
}
if($pid) {
$Global::total_running++;
$Global::total_started++;
$job->set_pid($pid);
$job->set_starttime();
$Global::running{$job->pid()} = $job;
if($opt::timeout) {
$Global::timeoutq->insert($job);
}
$Global::newest_job = $job;
$Global::newest_starttime = ::now();
return $job;
} else {
::debug("run", "Cannot spawn more jobs.\n");
return undef;
}
}
sub tmux_wrap {
my $self = shift;
my $actual_command = shift;
my ($fh, $tmpfile) = ::tmpfile(SUFFIX => ".tmx");
$Global::unlink{$tmpfile}=1;
close $fh;
unlink $tmpfile;
my $visual_command = $self->replaced();
my $title = $visual_command;
$title =~ tr/[\011-\016;\302-\365]//d;
my $tmux;
if($Global::total_running == 0) {
$tmux = "tmux new-session -s p$$ -d -n ".
::shell_quote_scalar($title);
print $Global::original_stderr "See output with: tmux attach -t p$$\n";
} else {
$tmux = "tmux new-window -t p$$ -n ".::shell_quote_scalar($title);
}
return "mkfifo $tmpfile; $tmux ".
::shell_quote_scalar(
"(".$actual_command.');(echo $?$status;echo 255) >'.$tmpfile."&".
"echo ".::shell_quote_scalar($visual_command).";".
"echo \007Job finished at: `date`;sleep 10").
"; exit `perl -ne 'unlink \$ARGV; 1..1 and print' $tmpfile` ";
}
sub is_already_in_results {
my $job = $_[0];
my $args_as_dirname = $job->{'commandline'}->args_as_dirname();
my $dir = $opt::results."/".$args_as_dirname;
::debug("run", "Test $dir/stdout", -e "$dir/stdout", "\n");
return -e "$dir/stdout";
}
sub is_already_in_joblog {
my $job = shift;
return vec($Global::job_already_run,$job->seq(),1);
}
sub set_job_in_joblog {
my $job = shift;
vec($Global::job_already_run,$job->seq(),1) = 1;
}
sub should_be_retried {
my $self = shift;
if (not $opt::retries) {
return 0;
}
if(not $self->exitstatus()) {
$self->reset_failed_here();
return 0
} else {
$self->add_failed_here();
if($self->total_failed() == $opt::retries) {
return 0;
} else {
$self->set_endtime(undef);
$Global::JobQueue->unget($self);
::debug("run", "Retry ", $self->seq(), "\n");
return 1;
}
}
}
sub print {
my $self = shift;
::debug("print", ">>joboutput ", $self->replaced(), "\n");
if($opt::dryrun) {
unlink $self->fh(1,"name");
}
if($opt::pipe and $self->virgin()) {
} else {
if($Global::joblog and defined $self->{'exitstatus'}) {
$self->print_joblog();
}
$opt::ungroup and return;
exit_if_disk_full();
if(($opt::dryrun or $Global::verbose)
and
not $self->{'verbose_printed'}) {
$self->{'verbose_printed'}++;
if($Global::verbose <= 1) {
print STDOUT $self->replaced(),"\n";
} else {
print STDOUT $self->wrapped(),"\n";
}
flush STDOUT;
}
}
for my $fdno (sort { $a <=> $b } keys %Global::fd) {
$fdno == 0 and next;
my $out_fd = $Global::fd{$fdno};
my $in_fh = $self->fh($fdno,"r");
if(not $in_fh) {
if(not $Job::file_descriptor_warning_printed{$fdno}++) {
}
next;
}
::debug("print", "File descriptor $fdno (", $self->fh($fdno,"name"), "):");
if($opt::files) {
close $self->fh($fdno,"w");
close $in_fh;
if($opt::pipe and $self->virgin()) {
for my $fdno (1,2) {
unlink $self->fh($fdno,"name");
unlink $self->fh($fdno,"unlink");
}
} elsif($fdno == 1 and $self->fh($fdno,"name")) {
print $out_fd $self->fh($fdno,"name"),"\n";
}
} elsif($opt::linebuffer) {
$self->linebuffer_print($fdno,$in_fh,$out_fd);
} else {
my $buf;
close $self->fh($fdno,"w");
seek $in_fh, 0, 0;
if($opt::tag or defined $opt::tagstring) {
my $tag = $self->tag();
if($fdno == 2) {
while(<$in_fh>) {
if(/^(client_process_control: )?tcgetattr: Invalid argument\n/) {
} else {
print $out_fd $tag,$_;
}
last;
}
}
while(<$in_fh>) {
print $out_fd $tag,$_;
}
} else {
my $buf;
if($fdno == 2) {
sysread($in_fh,$buf,1_000);
$buf =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
print $out_fd $buf;
}
while(sysread($in_fh,$buf,32768)) {
print $out_fd $buf;
}
}
close $in_fh;
}
flush $out_fd;
}
::debug("print", "<<joboutput @command\n");
}
sub linebuffer_print {
my $self = shift;
my ($fdno,$in_fh,$out_fd) = @_;
my $partial = \$self->{'partial_line',$fdno};
if(defined $self->{'exitstatus'}) {
close $self->fh($fdno,"w");
if($opt::compress) {
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
for my $fdno (1,2) {
my $fdr = $self->fh($fdno,'r');
my $flags;
fcntl($fdr, &F_GETFL, $flags) || die $!; $flags &= ~&O_NONBLOCK; fcntl($fdr, &F_SETFL, $flags) || die $!; }
}
}
seek $in_fh, tell($in_fh), 0;
while(read($in_fh,substr($$partial,length $$partial),3276800)) {
my $i = rindex($$partial,"\n");
if($i != -1) {
if($fdno == 2 and not $self->{'printed_first_line',$fdno}++) {
$$partial =~ s/^(client_process_control: )?tcgetattr: Invalid argument\n//;
$i = rindex($$partial,"\n");
}
if($opt::tag or defined $opt::tagstring) {
my $tag = $self->tag();
substr($$partial,0,$i+1) =~ s/^/$tag/gm;
$i = rindex($$partial,"\n");
}
print $out_fd substr($$partial,0,$i+1);
substr($$partial,0,$i+1)="";
}
}
if(defined $self->{'exitstatus'}) {
if($$partial and ($opt::tag or defined $opt::tagstring)) {
my $tag = $self->tag();
$$partial =~ s/^/$tag/gm;
}
print $out_fd $$partial;
$$partial = undef;
if($self->fh($fdno,"rpid") and CORE::kill 0, $self->fh($fdno,"rpid")) {
} else {
close $in_fh;
}
}
}
sub print_joblog {
my $self = shift;
my $cmd;
if($Global::verbose <= 1) {
$cmd = $self->replaced();
} else {
$cmd = "@command";
}
print $Global::joblog
join("\t", $self->seq(), $self->sshlogin()->string(),
$self->starttime(), sprintf("%10.3f",$self->runtime()),
$self->transfersize(), $self->returnsize(),
$self->exitstatus(), $self->exitsignal(), $cmd
). "\n";
flush $Global::joblog;
$self->set_job_in_joblog();
}
sub tag {
my $self = shift;
if(not defined $self->{'tag'}) {
$self->{'tag'} = $self->{'commandline'}->
replace_placeholders([$opt::tagstring],0,0)."\t";
}
return $self->{'tag'};
}
sub hostgroups {
my $self = shift;
if(not defined $self->{'hostgroups'}) {
$self->{'hostgroups'} = $self->{'commandline'}->{'arg_list'}[0][0]->{'hostgroups'};
}
return @{$self->{'hostgroups'}};
}
sub exitstatus {
my $self = shift;
return $self->{'exitstatus'};
}
sub set_exitstatus {
my $self = shift;
my $exitstatus = shift;
if($exitstatus) {
$self->{'exitstatus'} = $exitstatus;
} else {
$self->{'exitstatus'} ||= $exitstatus;
}
}
sub exitsignal {
my $self = shift;
return $self->{'exitsignal'};
}
sub set_exitsignal {
my $self = shift;
my $exitsignal = shift;
$self->{'exitsignal'} = $exitsignal;
}
{
my ($disk_full_fh, $b8193, $name);
sub exit_if_disk_full {
if(not $disk_full_fh) {
($disk_full_fh, $name) = ::tmpfile(SUFFIX => ".df");
unlink $name;
$b8193 = "x"x8193;
}
print $disk_full_fh $b8193;
if(not $disk_full_fh
or
tell $disk_full_fh == 0) {
::error("Output is incomplete. Cannot append to buffer file in $ENV{'TMPDIR'}. Is the disk full?\n");
::error("Change \$TMPDIR with --tmpdir or use --compress.\n");
::wait_and_exit(255);
}
truncate $disk_full_fh, 0;
seek($disk_full_fh, 0, 0) || die;
}
}
package CommandLine;
sub new {
my $class = shift;
my $seq = shift;
my $commandref = shift;
$commandref || die;
my $arg_queue = shift;
my $context_replace = shift;
my $max_number_of_args = shift; my $return_files = shift;
my $replacecount_ref = shift;
my $len_ref = shift;
my %replacecount = %$replacecount_ref;
my %len = %$len_ref;
for (keys %$replacecount_ref) {
$len{$_} = 0;
}
return bless {
'command' => $commandref,
'seq' => $seq,
'len' => \%len,
'arg_list' => [],
'arg_queue' => $arg_queue,
'max_number_of_args' => $max_number_of_args,
'replacecount' => \%replacecount,
'context_replace' => $context_replace,
'return_files' => $return_files,
'replaced' => undef,
}, ref($class) || $class;
}
sub seq {
my $self = shift;
return $self->{'seq'};
}
{
my $max_slot_number;
sub slot {
my $self = shift;
if(not $self->{'slot'}) {
if(not @Global::slots) {
push @Global::slots, ++$max_slot_number;
}
$self->{'slot'} = shift @Global::slots;
}
return $self->{'slot'};
}
}
sub populate {
my $self = shift;
my $next_arg;
my $max_len = $Global::minimal_command_line_length || Limits::Command::max_length();
if($opt::cat or $opt::fifo) {
my($outfh,$name) = ::tmpfile(SUFFIX => ".pip");
close $outfh;
unlink $name;
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget([Arg->new($name)]);
}
while (not $self->{'arg_queue'}->empty()) {
$next_arg = $self->{'arg_queue'}->get();
if(not defined $next_arg) {
next;
}
$self->push($next_arg);
if($self->len() >= $max_len) {
if($self->number_of_args() > 1) {
$self->{'arg_queue'}->unget($self->pop());
last;
} else {
my $args = join(" ", map { $_->orig() } @$next_arg);
::error("Command line too long (",
$self->len(), " >= ",
$max_len,
") at number ",
$self->{'arg_queue'}->arg_number(),
": ".
(substr($args,0,50))."...\n");
$self->{'arg_queue'}->unget($self->pop());
::wait_and_exit(255);
}
}
if(defined $self->{'max_number_of_args'}) {
if($self->number_of_args() >= $self->{'max_number_of_args'}) {
last;
}
}
}
if(($opt::m or $opt::X) and not $CommandLine::already_spread
and $self->{'arg_queue'}->empty() and $Global::max_jobs_running) {
$CommandLine::already_spread ||= 1;
if($self->number_of_args() > 1) {
$self->{'max_number_of_args'} =
::ceil($self->number_of_args()/$Global::max_jobs_running);
$Global::JobQueue->{'commandlinequeue'}->{'max_number_of_args'} =
$self->{'max_number_of_args'};
$self->{'arg_queue'}->unget($self->pop_all());
while($self->number_of_args() < $self->{'max_number_of_args'}) {
$self->push($self->{'arg_queue'}->get());
}
}
}
}
sub push {
my $self = shift;
my $record = shift;
push @{$self->{'arg_list'}}, $record;
my $quote_arg = $Global::noquote ? 0 : not $Global::quoting;
my $rep;
for my $arg (@$record) {
if(defined $arg) {
for my $perlexpr (keys %{$self->{'replacecount'}}) {
$self->{'len'}{$perlexpr} += length $arg->replace($perlexpr,$quote_arg,$self);
}
}
}
}
sub pop {
my $self = shift;
my $record = pop @{$self->{'arg_list'}};
my $quote_arg = $Global::noquote ? 0 : not $Global::quoting;
for my $arg (@$record) {
if(defined $arg) {
for my $perlexpr (keys %{$self->{'replacecount'}}) {
$self->{'len'}{$perlexpr} -=
length $arg->replace($perlexpr,$quote_arg,$self);
}
}
}
return $record;
}
sub pop_all {
my $self = shift;
my @popped = @{$self->{'arg_list'}};
for my $replacement_string (keys %{$self->{'replacecount'}}) {
$self->{'len'}{$replacement_string} = 0;
}
$self->{'arg_list'} = [];
return @popped;
}
sub number_of_args {
my $self = shift;
return $#{$self->{'arg_list'}}+1;
}
sub number_of_recargs {
my $self = shift;
my $sum = 0;
my $nrec = scalar @{$self->{'arg_list'}};
if($nrec) {
$sum = $nrec * (scalar @{$self->{'arg_list'}[0]});
}
return $sum;
}
sub args_as_string {
my $self = shift;
return (join " ", map { $_->orig() }
map { @$_ } @{$self->{'arg_list'}});
}
sub args_as_dirname {
my $self = shift;
my @res = ();
for my $rec_ref (@{$self->{'arg_list'}}) {
my @header_indexes_sorted = header_indexes_sorted($#$rec_ref+1);
for my $n (@header_indexes_sorted) {
CORE::push(@res,
$Global::input_source_header{$n},
map { my $s = $_;
$s =~ s/\\/\\\\/g;
$s =~ s/\t/\\t/g;
$s =~ s/\0/\\0/g;
$s =~ s:/:\\_:g;
if($Global::max_file_length) {
$s = substr($s,0,$Global::max_file_length);
}
$s; }
$rec_ref->[$n-1]->orig());
}
}
return join "/", @res;
}
sub header_indexes_sorted {
my $max_col = shift;
no warnings 'numeric';
for my $col (1 .. $max_col) {
if(not defined $Global::input_source_header{$col}) {
$Global::input_source_header{$col} = $col;
}
}
my @header_indexes_sorted = sort {
$Global::input_source_header{$a} <=> $Global::input_source_header{$b}
or
$Global::input_source_header{$a} cmp $Global::input_source_header{$b}
} 1 .. $max_col;
return @header_indexes_sorted;
}
sub len {
my $self = shift;
my $len = 0;
$len += $self->{'len'}{'noncontext'} + @{$self->{'command'}} -1;
::debug("length", "noncontext + command: $len\n");
my $recargs = $self->number_of_recargs();
if($self->{'context_replace'}) {
$len += $recargs * $self->{'len'}{'context'};
for my $replstring (keys %{$self->{'replacecount'}}) {
$len += $self->{'len'}{$replstring} *
$self->{'replacecount'}{$replstring};
::debug("length", $replstring, " ", $self->{'len'}{$replstring}, "*",
$self->{'replacecount'}{$replstring}, "\n");
}
::debug("length", "Ctxgrp: ", $self->{'len'}{'contextgroups'},
" Groups: ", $self->{'len'}{'noncontextgroups'}, "\n");
$len += ($recargs-1) * ($self->{'len'}{'contextgroups'});
} else {
$len += 1*$self->{'len'}{'context'};
::debug("length", "context+noncontext + command: $len\n");
for my $replstring (keys %{$self->{'replacecount'}}) {
$len += ($recargs -1 + $self->{'len'}{$replstring}) *
$self->{'replacecount'}{$replstring};
}
}
if($opt::nice) {
$len *= 2;
}
if($Global::quoting) {
$len *= 2;
}
if($opt::shellquote) {
$len *= 4;
}
$len += $Global::envvarlen;
return $len;
}
sub replaced {
my $self = shift;
if(not defined $self->{'replaced'}) {
my $quote_arg = $Global::noquote ? 0 : not $Global::quoting;
$self->{'replaced'} = $self->replace_placeholders($self->{'command'},$Global::quoting,$quote_arg);
my $len = length $self->{'replaced'};
if ($len != $self->len()) {
::debug("length", $len, " != ", $self->len(), " ", $self->{'replaced'}, "\n");
} else {
::debug("length", $len, " == ", $self->len(), " ", $self->{'replaced'}, "\n");
}
}
return $self->{'replaced'};
}
sub replace_placeholders {
my $self = shift;
my $targetref = shift;
my $quote = shift;
my $quote_arg = shift;
my $context_replace = $self->{'context_replace'};
my @target = @$targetref;
::debug("replace", "Replace @target\n");
if(not @target) {
return @target;
}
my %word;
for (@target) {
my $tt = $_;
::debug("replace", "Target: $tt");
if($context_replace) {
while($tt =~ s/([^\s\257]* # before {=
(?:
\257< # {=
[^\257]*? # The perl expression
\257> # =}
[^\s\257]* # after =}
)+)/ /x) {
$word{"$1"} ||= 1;
}
} else {
while($tt =~ s/( (?: \257<([^\257]*?)\257>) )//x) {
$word{$1} ||= 1;
}
}
}
my @word = keys %word;
my %replace;
my @arg;
for my $record (@{$self->{'arg_list'}}) {
CORE::push @arg, @$record;
}
if(not @arg) { @arg = (Arg->new("")); }
my $n = $#_+1;
my $job = $self;
for my $word (@word) {
my $w = $word;
::debug("replace", "Replacing in $w\n");
$w =~ s< ([^\s\257]*) \257< (-?\d+) ([^\257]*?) \257> # =}
([^\s\257]*) # after =}
>
{ $1. (
$arg[$2 > 0 ? $2-1 : $n+$2] ? $arg[$2 > 0 ? $2-1 : $n+$2]->replace($3,$quote_arg,$self)
: "")
.$4 }egx; ::debug("replace", "Positional replaced $word with: $w\n");
if($w !~ /\257/) {
if($quote) {
CORE::push(@{$replace{::shell_quote($word)}}, $w);
} else {
CORE::push(@{$replace{$word}}, $w);
}
next;
}
::debug("replace", "Positional done: $w\n");
for my $arg (@arg) {
my $val = $w;
my $number_of_replacements = 0;
for my $perlexpr (keys %{$self->{'replacecount'}}) {
$number_of_replacements +=
$val =~ s{\257<\Q$perlexpr\E\257>}
{$arg ? $arg->replace($perlexpr,$quote_arg,$self) : ""}eg;
}
my $ww = $word;
if($quote) {
$ww = ::shell_quote_scalar($word);
$val = ::shell_quote_scalar($val);
}
if($number_of_replacements) {
CORE::push(@{$replace{$ww}}, $val);
}
}
}
if($quote) {
@target = ::shell_quote(@target);
}
if(%replace) {
my $regexp = join('|', map { my $s = $_; $s =~ s/(\W)/\\$1/g; $s }
sort { length $b <=> length $a } keys %replace);
for(@target) {
s/($regexp)/join(" ",@{$replace{$1}})/ge;
}
}
::debug("replace", "Return @target\n");
return wantarray ? @target : "@target";
}
package CommandLineQueue;
sub new {
my $class = shift;
my $commandref = shift;
my $read_from = shift;
my $context_replace = shift;
my $max_number_of_args = shift;
my $return_files = shift;
my @unget = ();
my ($count,%replacecount,$posrpl,$perlexpr,%len);
my @command = @$commandref;
if($command[0] =~ /^\s*(-\S+)/) {
my $cmd = $1;
if(not ::which($cmd)) {
::error("Command ($cmd) starts with '-'. Is this a wrong option?\n");
::wait_and_exit(255);
}
}
for(@command) {
if(/\257/) {
::error("Command cannot contain the character \257. Use a function for that.\n");
::wait_and_exit(255);
}
s/\Q$Global::parensleft\E(.*?)\Q$Global::parensright\E/\257<$1\257>/gx;
}
for my $rpl (keys %Global::rpl) {
for(@command,@Global::ret_files) {
while(s/((^|\257>)[^\257]*?) # Don't replace after \257 unless \257>
\Q$rpl\E/$1\257<$Global::rpl{$rpl}\257>/xg) {
}
}
if(defined $opt::tagstring) {
for($opt::tagstring) {
while(s/((^|\257>)[^\257]*?) # Don't replace after \257 unless \257>
\Q$rpl\E/$1\257<$Global::rpl{$rpl}\257>/x) {}
}
}
$posrpl = $rpl;
if($posrpl =~ s/^\{//) {
for(@command,@Global::ret_files) {
s/\{(-?\d+)\Q$posrpl\E/\257<$1 $Global::rpl{$rpl}\257>/g;
}
if(defined $opt::tagstring) {
$opt::tagstring =~ s/\{(-?\d+)\Q$posrpl\E/\257<$1 $perlexpr\257>/g;
}
}
}
my $sum = 0;
while($sum == 0) {
my @cmd = @command;
my $contextlen = 0;
my $noncontextlen = 0;
my $contextgroups = 0;
for my $c (@cmd) {
while($c =~ s/ \257<([^\257]*?)\257> /\000/x) {
$replacecount{$1} ++;
$sum++;
}
while($c =~ s/ (\S*\000\S*) //x) {
my $w = $1;
$w =~ tr/\000//d; $contextlen += length($w);
$contextgroups++;
}
$noncontextlen += length $c;
}
if($opt::tagstring) {
my $t = $opt::tagstring;
while($t =~ s/ \257<([^\257]*)\257> //x) {
$replacecount{$1}||=1;
}
}
$len{'context'} = 0+$contextlen;
$len{'noncontext'} = $noncontextlen;
$len{'contextgroups'} = $contextgroups;
$len{'noncontextgroups'} = @cmd-$contextgroups;
::debug("length", "@command Context: ", $len{'context'},
" Non: ", $len{'noncontext'}, " Ctxgrp: ", $len{'contextgroups'},
" NonCtxGrp: ", $len{'noncontextgroups'}, "\n");
if($sum == 0) {
if(not @command) {
@command = ("\257<\257>");
$Global::noquote = 1;
} elsif(($opt::pipe or $opt::pipepart)
and not $opt::fifo and not $opt::cat) {
last;
} else {
push @command, ("\257<\257>");
}
}
}
return bless {
'unget' => \@unget,
'command' => \@command,
'replacecount' => \%replacecount,
'arg_queue' => RecordQueue->new($read_from,$opt::colsep),
'context_replace' => $context_replace,
'len' => \%len,
'max_number_of_args' => $max_number_of_args,
'size' => undef,
'return_files' => $return_files,
'seq' => 1,
}, ref($class) || $class;
}
sub get {
my $self = shift;
if(@{$self->{'unget'}}) {
my $cmd_line = shift @{$self->{'unget'}};
return ($cmd_line);
} else {
my $cmd_line;
$cmd_line = CommandLine->new($self->seq(),
$self->{'command'},
$self->{'arg_queue'},
$self->{'context_replace'},
$self->{'max_number_of_args'},
$self->{'return_files'},
$self->{'replacecount'},
$self->{'len'},
);
$cmd_line->populate();
::debug("init","cmd_line->number_of_args ",
$cmd_line->number_of_args(), "\n");
if($opt::pipe or $opt::pipepart) {
if($cmd_line->replaced() eq "") {
::error("--pipe must have a command to pipe into (e.g. 'cat').\n");
::wait_and_exit(255);
}
} else {
if($cmd_line->number_of_args() == 0) {
return undef;
} elsif($cmd_line->replaced() eq "") {
return $self->get();
}
}
$self->set_seq($self->seq()+1);
return $cmd_line;
}
}
sub unget {
my $self = shift;
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = (not @{$self->{'unget'}}) && $self->{'arg_queue'}->empty();
::debug("run", "CommandLineQueue->empty $empty");
return $empty;
}
sub seq {
my $self = shift;
return $self->{'seq'};
}
sub set_seq {
my $self = shift;
$self->{'seq'} = shift;
}
sub quote_args {
my $self = shift;
return $self->{'command'};
}
sub size {
my $self = shift;
if(not $self->{'size'}) {
my @all_lines = ();
while(not $self->{'arg_queue'}->empty()) {
push @all_lines, CommandLine->new($self->{'command'},
$self->{'arg_queue'},
$self->{'context_replace'},
$self->{'max_number_of_args'});
}
$self->{'size'} = @all_lines;
$self->unget(@all_lines);
}
return $self->{'size'};
}
package Limits::Command;
sub max_length {
if(not $Limits::Command::line_max_len) {
my $len_cache = $ENV{'HOME'} . "/.parallel/tmp/linelen-" . ::hostname();
my $cached_limit;
if(-e $len_cache) {
open(my $fh, "<", $len_cache) || ::die_bug("Cannot read $len_cache");
$cached_limit = <$fh>;
close $fh;
} else {
$cached_limit = real_max_length();
mkdir($ENV{'HOME'} . "/.parallel");
mkdir($ENV{'HOME'} . "/.parallel/tmp");
open(my $fh, ">", $len_cache);
print $fh $cached_limit;
close $fh;
}
$Limits::Command::line_max_len = $cached_limit;
if($opt::max_chars) {
if($opt::max_chars <= $cached_limit) {
$Limits::Command::line_max_len = $opt::max_chars;
} else {
::warning("Value for -s option ",
"should be < $cached_limit.\n");
}
}
}
return $Limits::Command::line_max_len;
}
sub real_max_length {
my $upper = 8_000_000;
my $len = 8;
do {
if($len > $upper) { return $len };
$len *= 16;
} while (is_acceptable_command_line_length($len));
return binary_find_max_length(int($len/16),$len);
}
sub binary_find_max_length {
my ($lower, $upper) = (@_);
if($lower == $upper or $lower == $upper-1) { return $lower; }
my $middle = int (($upper-$lower)/2 + $lower);
::debug("init", "Maxlen: $lower,$upper,$middle : ");
if (is_acceptable_command_line_length($middle)) {
return binary_find_max_length($middle,$upper);
} else {
return binary_find_max_length($lower,$middle);
}
}
sub is_acceptable_command_line_length {
my $len = shift;
local *STDERR;
open (STDERR, ">", "/dev/null");
system "true "."x"x$len;
close STDERR;
::debug("init", "$len=$? ");
return not $?;
}
package RecordQueue;
sub new {
my $class = shift;
my $fhs = shift;
my $colsep = shift;
my @unget = ();
my $arg_sub_queue;
if($colsep) {
$arg_sub_queue = RecordColQueue->new($fhs);
} else {
$arg_sub_queue = MultifileQueue->new($fhs);
}
return bless {
'unget' => \@unget,
'arg_number' => 0,
'arg_sub_queue' => $arg_sub_queue,
}, ref($class) || $class;
}
sub get {
my $self = shift;
if(@{$self->{'unget'}}) {
$self->{'arg_number'}++;
return shift @{$self->{'unget'}};
}
my $ret = $self->{'arg_sub_queue'}->get();
if(defined $Global::max_number_of_args
and $Global::max_number_of_args == 0) {
::debug("run", "Read 1 but return 0 args\n");
return [Arg->new("")];
} else {
return $ret;
}
}
sub unget {
my $self = shift;
::debug("run", "RecordQueue-unget '@_'\n");
$self->{'arg_number'} -= @_;
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = not @{$self->{'unget'}};
$empty &&= $self->{'arg_sub_queue'}->empty();
::debug("run", "RecordQueue->empty $empty");
return $empty;
}
sub arg_number {
my $self = shift;
return $self->{'arg_number'};
}
package RecordColQueue;
sub new {
my $class = shift;
my $fhs = shift;
my @unget = ();
my $arg_sub_queue = MultifileQueue->new($fhs);
return bless {
'unget' => \@unget,
'arg_sub_queue' => $arg_sub_queue,
}, ref($class) || $class;
}
sub get {
my $self = shift;
if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}};
}
my $unget_ref=$self->{'unget'};
if($self->{'arg_sub_queue'}->empty()) {
return undef;
}
my $in_record = $self->{'arg_sub_queue'}->get();
if(defined $in_record) {
my @out_record = ();
for my $arg (@$in_record) {
::debug("run", "RecordColQueue::arg $arg\n");
my $line = $arg->orig();
::debug("run", "line='$line'\n");
if($line ne "") {
for my $s (split /$opt::colsep/o, $line, -1) {
push @out_record, Arg->new($s);
}
} else {
push @out_record, Arg->new("");
}
}
return \@out_record;
} else {
return undef;
}
}
sub unget {
my $self = shift;
::debug("run", "RecordColQueue-unget '@_'\n");
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = (not @{$self->{'unget'}} and $self->{'arg_sub_queue'}->empty());
::debug("run", "RecordColQueue->empty $empty");
return $empty;
}
package MultifileQueue;
@Global::unget_argv=();
sub new {
my $class = shift;
my $fhs = shift;
for my $fh (@$fhs) {
if(-t $fh) {
::warning("Input is read from the terminal. ".
"Only experts do this on purpose. ".
"Press CTRL-D to exit.\n");
}
}
return bless {
'unget' => \@Global::unget_argv,
'fhs' => $fhs,
'arg_matrix' => undef,
}, ref($class) || $class;
}
sub get {
my $self = shift;
if($opt::xapply) {
return $self->xapply_get();
} else {
return $self->nest_get();
}
}
sub unget {
my $self = shift;
::debug("run", "MultifileQueue-unget '@_'\n");
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = (not @Global::unget_argv
and not @{$self->{'unget'}});
for my $fh (@{$self->{'fhs'}}) {
$empty &&= eof($fh);
}
::debug("run", "MultifileQueue->empty $empty ");
return $empty;
}
sub xapply_get {
my $self = shift;
if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}};
}
my @record = ();
my $prepend = undef;
my $empty = 1;
for my $fh (@{$self->{'fhs'}}) {
my $arg = read_arg_from_fh($fh);
if(defined $arg) {
push @{$self->{'arg_matrix'}{$fh}}, $arg;
push @record, $arg;
$empty = 0;
} else {
::debug("run", "EOA ");
push @{$self->{'arg_matrix'}{$fh}}, shift @{$self->{'arg_matrix'}{$fh}};
push @record, @{$self->{'arg_matrix'}{$fh}}[-1];
}
}
if($empty) {
return undef;
} else {
return \@record;
}
}
sub nest_get {
my $self = shift;
if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}};
}
my @record = ();
my $prepend = undef;
my $empty = 1;
my $no_of_inputsources = $#{$self->{'fhs'}} + 1;
if(not $self->{'arg_matrix'}) {
my @first_arg_set;
my $all_empty = 1;
for (my $fhno = 0; $fhno < $no_of_inputsources ; $fhno++) {
my $arg = read_arg_from_fh($self->{'fhs'}[$fhno]);
if(defined $arg) {
$all_empty = 0;
}
$self->{'arg_matrix'}[$fhno][0] = $arg || Arg->new("");
push @first_arg_set, $self->{'arg_matrix'}[$fhno][0];
}
if($all_empty) {
return undef;
}
return [@first_arg_set];
}
if($no_of_inputsources == 1) {
my $arg = read_arg_from_fh($self->{'fhs'}[0]);
if(defined($arg)) {
return [$arg];
}
return undef;
}
for (my $fhno = $no_of_inputsources - 1; $fhno >= 0; $fhno--) {
if(eof($self->{'fhs'}[$fhno])) {
next;
} else {
my $arg = read_arg_from_fh($self->{'fhs'}[$fhno]);
defined($arg) || next; my $len = $#{$self->{'arg_matrix'}[$fhno]} + 1;
$self->{'arg_matrix'}[$fhno][$len] = $arg;
my @combarg = ();
for (my $fhn = 0; $fhn < $no_of_inputsources; $fhn++) {
push @combarg, [0, $#{$self->{'arg_matrix'}[$fhn]}];
}
$combarg[$fhno] = [$len,$len]; my @mapped;
for my $c (expand_combinations(@combarg)) {
my @a;
for my $n (0 .. $no_of_inputsources - 1 ) {
push @a, $self->{'arg_matrix'}[$n][$$c[$n]];
}
push @mapped, \@a;
}
push @{$self->{'unget'}}, @mapped;
return shift @{$self->{'unget'}};
}
}
return shift @{$self->{'unget'}};
}
sub read_arg_from_fh {
my $fh = shift;
my $prepend = undef;
my $arg;
do {{
if(not ($arg = <$fh>)) {
if(defined $prepend) {
return Arg->new($prepend);
} else {
return undef;
}
}
$arg =~ s:$/$::;
if($Global::end_of_file_string and
$arg eq $Global::end_of_file_string) {
close $fh;
::debug("run", "EOF-string ($arg) met\n");
if(defined $prepend) {
return Arg->new($prepend);
} else {
return undef;
}
}
if(defined $prepend) {
$arg = $prepend.$arg; $prepend = undef; }
if($Global::ignore_empty) {
if($arg =~ /^\s*$/) {
redo; }
}
if($Global::max_lines) {
if($arg =~ /\s$/) {
$prepend = $arg;
redo;
}
}
}} while (1 == 0); if(defined $arg) {
return Arg->new($arg);
} else {
::die_bug("multiread arg undefined");
}
}
sub expand_combinations {
my $minmax_ref = shift;
my $xmin = $$minmax_ref[0];
my $xmax = $$minmax_ref[1];
my @p;
if(@_) {
my @rest = expand_combinations(@_);
for(my $x = $xmin; $x <= $xmax; $x++) {
push @p, map { [$x, @$_] } @rest;
}
} else {
for(my $x = $xmin; $x <= $xmax; $x++) {
push @p, [$x];
}
}
return @p;
}
package Arg;
sub new {
my $class = shift;
my $orig = shift;
my @hostgroups;
if($opt::hostgroups) {
if($orig =~ s:@(.+)::) {
@hostgroups = split(/\+/, $1);
if(not grep { defined $Global::hostgroups{$_} } @hostgroups) {
::warning("No such hostgroup (@hostgroups)\n");
@hostgroups = (keys %Global::hostgroups);
}
} else {
@hostgroups = (keys %Global::hostgroups);
}
}
return bless {
'orig' => $orig,
'hostgroups' => \@hostgroups,
}, ref($class) || $class;
}
sub replace {
my $self = shift;
my $perlexpr = shift; my $quote = (shift) ? 1 : 0; my $job = shift;
$perlexpr =~ s/^-?\d+ //; if(not defined $self->{"rpl",0,$perlexpr}) {
local $_;
if($Global::trim eq "n") {
$_ = $self->{'orig'};
} else {
$_ = trim_of($self->{'orig'});
}
::debug("replace", "eval ", $perlexpr, " ", $_, "\n");
if(not $Global::perleval{$perlexpr}) {
if($Global::perleval{$perlexpr} =
eval('sub { no strict; no warnings; my $job = shift; '.
$perlexpr.' }')) {
} else {
::error("Cannot use $perlexpr: $@\n");
::wait_and_exit(255);
}
}
$Global::perleval{$perlexpr}->($job);
$self->{"rpl",0,$perlexpr} = $_;
}
if(not defined $self->{"rpl",$quote,$perlexpr}) {
$self->{"rpl",1,$perlexpr} =
::shell_quote_scalar($self->{"rpl",0,$perlexpr});
}
return $self->{"rpl",$quote,$perlexpr};
}
sub orig {
my $self = shift;
return $self->{'orig'};
}
sub trim_of {
my @strings = map { defined $_ ? $_ : "" } (@_);
my $arg;
if($Global::trim eq "n") {
} elsif($Global::trim eq "l") {
for my $arg (@strings) { $arg =~ s/^\s+//; }
} elsif($Global::trim eq "r") {
for my $arg (@strings) { $arg =~ s/\s+$//; }
} elsif($Global::trim eq "rl" or $Global::trim eq "lr") {
for my $arg (@strings) { $arg =~ s/^\s+//; $arg =~ s/\s+$//; }
} else {
::error("--trim must be one of: r l rl lr.\n");
::wait_and_exit(255);
}
return wantarray ? @strings : "@strings";
}
package TimeoutQueue;
sub new {
my $class = shift;
my $delta_time = shift;
my ($pct);
if($delta_time =~ /(\d+(\.\d+)?)%/) {
$pct = $1/100;
$delta_time = 1_000_000;
}
return bless {
'queue' => [],
'delta_time' => $delta_time,
'pct' => $pct,
'remedian_idx' => 0,
'remedian_arr' => [],
'remedian' => undef,
}, ref($class) || $class;
}
sub delta_time {
my $self = shift;
return $self->{'delta_time'};
}
sub set_delta_time {
my $self = shift;
$self->{'delta_time'} = shift;
}
sub remedian {
my $self = shift;
return $self->{'remedian'};
}
sub set_remedian {
my $self = shift;
my $val = shift;
my $i = $self->{'remedian_idx'}++;
my $rref = $self->{'remedian_arr'};
$rref->[0][$i%999] = $val;
$rref->[1][$i/999%999] = (sort @{$rref->[0]})[$#{$rref->[0]}/2];
$rref->[2][$i/999/999%999] = (sort @{$rref->[1]})[$#{$rref->[1]}/2];
$self->{'remedian'} = (sort @{$rref->[2]})[$#{$rref->[2]}/2];
}
sub update_delta_time {
my $self = shift;
my $runtime = shift;
if($self->{'pct'}) {
$self->set_remedian($runtime);
$self->{'delta_time'} = $self->{'pct'} * $self->remedian();
::debug("run", "Timeout: $self->{'delta_time'}s ");
}
}
sub process_timeouts {
my $self = shift;
while (@{$self->{'queue'}}) {
my $job = $self->{'queue'}[0];
if($job->endtime()) {
shift @{$self->{'queue'}};
} elsif($job->timedout($self->{'delta_time'})) {
shift @{$self->{'queue'}};
$job->kill();
} else {
last;
}
}
}
sub insert {
my $self = shift;
my $in = shift;
push @{$self->{'queue'}}, $in;
}
package Semaphore;
sub new {
my $class = shift;
my $id = shift;
my $count = shift;
$id=~s/([^-_a-z0-9])/unpack("H*",$1)/ige; $id="id-".$id; my $parallel_dir = $ENV{'HOME'}."/.parallel";
-d $parallel_dir or mkdir_or_die($parallel_dir);
my $parallel_locks = $parallel_dir."/semaphores";
-d $parallel_locks or mkdir_or_die($parallel_locks);
my $lockdir = "$parallel_locks/$id";
my $lockfile = $lockdir.".lock";
if($count < 1) { ::die_bug("semaphore-count: $count"); }
return bless {
'lockfile' => $lockfile,
'lockfh' => Symbol::gensym(),
'lockdir' => $lockdir,
'id' => $id,
'idfile' => $lockdir."/".$id,
'pid' => $$,
'pidfile' => $lockdir."/".$$.'@'.::hostname(),
'count' => $count + 1 }, ref($class) || $class;
}
sub acquire {
my $self = shift;
my $sleep = 1; my $start_time = time;
while(1) {
$self->atomic_link_if_count_less_than() and last;
::debug("sem", "Remove dead locks");
my $lockdir = $self->{'lockdir'};
for my $d (glob "$lockdir/*") {
::debug("sem", "Lock $d $lockdir\n");
$d =~ m:$lockdir/([0-9]+)\@([-\._a-z0-9]+)$:o or next;
my ($pid, $host) = ($1, $2);
if($host eq ::hostname()) {
if(not kill 0, $1) {
::debug("sem", "Dead: $d");
unlink $d;
} else {
::debug("sem", "Alive: $d");
}
}
}
$self->atomic_link_if_count_less_than() and last;
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
::usleep(rand()*$sleep);
if(defined($opt::timeout) and
$start_time + $opt::timeout > time) {
if(not -e $self->{'idfile'}) {
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("timeout_write_idfile: $self->{'idfile'}");
close $fh;
}
link $self->{'idfile'}, $self->{'pidfile'};
last;
}
}
::debug("sem", "acquired $self->{'pid'}\n");
}
sub release {
my $self = shift;
unlink $self->{'pidfile'};
if($self->nlinks() == 1) {
$self->lock();
if($self->nlinks() == 1) {
unlink $self->{'idfile'};
rmdir $self->{'lockdir'};
}
$self->unlock();
}
::debug("run", "released $self->{'pid'}\n");
}
sub _release {
my $self = shift;
unlink $self->{'pidfile'};
$self->lock();
my $nlinks = $self->nlinks();
::debug("sem", $nlinks, "<", $self->{'count'});
if($nlinks-- > 1) {
unlink $self->{'idfile'};
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
print $fh "#"x$nlinks;
close $fh;
} else {
unlink $self->{'idfile'};
rmdir $self->{'lockdir'};
}
$self->unlock();
::debug("sem", "released $self->{'pid'}\n");
}
sub atomic_link_if_count_less_than {
my $self = shift;
my $retval = 0;
$self->lock();
::debug($self->nlinks(), "<", $self->{'count'});
if($self->nlinks() < $self->{'count'}) {
-d $self->{'lockdir'} or mkdir_or_die($self->{'lockdir'});
if(not -e $self->{'idfile'}) {
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
close $fh;
}
$retval = link $self->{'idfile'}, $self->{'pidfile'};
}
$self->unlock();
::debug("run", "atomic $retval");
return $retval;
}
sub _atomic_link_if_count_less_than {
my $self = shift;
my $retval = 0;
$self->lock();
my $nlinks = $self->nlinks();
::debug("sem", $nlinks, "<", $self->{'count'});
if($nlinks++ < $self->{'count'}) {
-d $self->{'lockdir'} or mkdir_or_die($self->{'lockdir'});
if(not -e $self->{'idfile'}) {
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
close $fh;
}
open (my $fh, ">", $self->{'idfile'}) or
::die_bug("write_idfile: $self->{'idfile'}");
print $fh "#"x$nlinks;
close $fh;
$retval = link $self->{'idfile'}, $self->{'pidfile'};
}
$self->unlock();
::debug("sem", "atomic $retval");
return $retval;
}
sub nlinks {
my $self = shift;
if(-e $self->{'idfile'}) {
::debug("sem", "nlinks", (stat(_))[3], "size", (stat(_))[7], "\n");
return (stat(_))[3];
} else {
return 0;
}
}
sub lock {
my $self = shift;
my $sleep = 100; my $total_sleep = 0;
$Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
my $locked = 0;
while(not $locked) {
if(tell($self->{'lockfh'}) == -1) {
open($self->{'lockfh'}, ">", $self->{'lockfile'})
or ::debug("run", "Cannot open $self->{'lockfile'}");
}
if($self->{'lockfh'}) {
chmod 0666, $self->{'lockfile'}; if(flock($self->{'lockfh'}, LOCK_EX()|LOCK_NB())) {
$locked = 1;
last;
} else {
if ($! =~ m/Function not implemented/) {
::warning("flock: $!");
::warning("Will wait for a random while\n");
::usleep(rand(5000));
$locked = 2;
last;
}
}
}
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
::usleep(rand()*$sleep);
$total_sleep += $sleep;
if($opt::semaphoretimeout) {
if($total_sleep/1000 > $opt::semaphoretimeout) {
::warning("Semaphore timed out. Ignoring timeout.");
$locked = 3;
last;
}
} else {
if($total_sleep/1000 > 30) {
::warning("Semaphore stuck for 30 seconds. Consider using --semaphoretimeout.");
}
}
}
::debug("run", "locked $self->{'lockfile'}");
}
sub unlock {
my $self = shift;
unlink $self->{'lockfile'};
close $self->{'lockfh'};
::debug("run", "unlocked\n");
}
sub mkdir_or_die {
my $dir = shift;
my @dir_parts = split(m:/:,$dir);
my ($ddir,$part);
while(defined ($part = shift @dir_parts)) {
$part eq "" and next;
$ddir .= "/".$part;
-d $ddir and next;
mkdir $ddir;
}
if(not -w $dir) {
::error("Cannot write to $dir: $!\n");
::wait_and_exit(255);
}
}
$opt::x = $Semaphore::timeout = $Semaphore::wait =
$Job::file_descriptor_warning_printed = 0;