#![cfg(test)]
#![allow(unused_imports)]
use super::super::affinity::*;
use super::super::config::*;
use super::super::types::*;
use super::super::worker::*;
use super::testing::*;
use super::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
#[test]
fn spawn_guard_cleans_up_on_interworker_pipe_emfile() {
let code = run_in_forked_child(|| {
let baseline = count_open_fds();
let mut original_rlimit = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut original_rlimit) } != 0 {
return 13;
}
let target_cur = (baseline + 5) as u64;
let lowered = libc::rlimit {
rlim_cur: target_cur,
rlim_max: original_rlimit.rlim_max,
};
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &lowered) } != 0 {
return 13;
}
let config = WorkloadConfig {
num_workers: 4,
affinity: AffinityIntent::Inherit,
work_type: WorkType::PipeIo { burst_iters: 1 },
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
if result.is_ok() {
return 10; }
let err_msg = format!("{:#}", result.as_ref().err().unwrap());
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &original_rlimit) } != 0 {
return 15;
}
if !err_msg.contains("pipe2 failed") {
return 14;
}
let after = count_open_fds();
if after > baseline {
return 11; }
if any_zombie_child() {
return 12;
}
0
});
assert_eq!(
code, 0,
"child reported cleanup defect (code {code}): see exit-code table above \
spawn_guard_cleans_up_on_interworker_pipe_emfile"
);
}
#[test]
fn spawn_guard_cleans_up_on_wake_chain_pipe_emfile() {
let code = run_in_forked_child(|| {
let baseline = count_open_fds();
let mut original_rlimit = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut original_rlimit) } != 0 {
return 13;
}
let target_cur = (baseline + 5) as u64;
let lowered = libc::rlimit {
rlim_cur: target_cur,
rlim_max: original_rlimit.rlim_max,
};
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &lowered) } != 0 {
return 13;
}
let config = WorkloadConfig {
num_workers: 4,
affinity: AffinityIntent::Inherit,
work_type: WorkType::WakeChain {
depth: 4,
wake: WakeMechanism::Pipe,
work_per_hop: Duration::from_micros(100),
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
if result.is_ok() {
return 10; }
let err_msg = format!("{:#}", result.as_ref().err().unwrap());
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &original_rlimit) } != 0 {
return 15;
}
if !err_msg.contains("pipe2 ") {
eprintln!("unexpected spawn error (exit 14): {err_msg}");
return 14;
}
let after = count_open_fds();
if after > baseline {
return 11; }
if any_zombie_child() {
return 12;
}
0
});
assert_eq!(
code, 0,
"child reported cleanup defect (code {code}): see exit-code table above \
spawn_guard_cleans_up_on_wake_chain_pipe_emfile"
);
}
#[test]
fn spawn_guard_cleans_up_on_fork_eagain() {
let code = run_in_forked_child(|| {
let baseline = count_open_fds();
if !set_rlimit_nproc_zero_headroom() {
return 13;
}
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::SpinWait,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
if result.is_ok() {
return 10; }
let msg = format!("{:#}", result.err().unwrap());
if !msg.contains("fork failed") {
return 14;
}
let after = count_open_fds();
if after > baseline {
return 11;
}
if any_zombie_child() {
return 12;
}
0
});
assert_eq!(
code, 0,
"child reported cleanup defect (code {code}): see exit-code table above \
spawn_guard_cleans_up_on_fork_eagain"
);
}
#[test]
fn io_sync_write_cleans_up_tempfile_fallback() {
if std::path::Path::new("/dev/vda").exists() {
return;
}
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::IoSyncWrite,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
let tid = reports[0].tid;
let path = std::env::temp_dir()
.join(format!("ktstr_iodev_{tid}"))
.to_string_lossy()
.to_string();
assert!(
!std::path::Path::new(&path).exists(),
"tempfile fallback {path} should be cleaned up"
);
}