use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
process::Stdio,
sync::{Arc, RwLock},
time::Duration,
};
use qos_nsm::NsmProvider;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
};
use crate::{
handles::Handles,
io::{HostBridge, IOError, SocketAddress, StreamPool},
protocol::{
ProtocolPhase, ProtocolState,
processor::ProtocolProcessor,
services::boot::{BridgeConfig, RestartPolicy},
},
server::SocketServer,
};
pub const REAPER_RESTART_DELAY: Duration = Duration::from_millis(50);
pub const REAPER_EXIT_DELAY: Duration = Duration::from_secs(3);
const REAPER_STATE_CHECK_DELAY: Duration = Duration::from_millis(100);
async fn run_server(
server_state: Arc<RwLock<InterState>>,
handles: Handles,
nsm: Box<dyn NsmProvider + Send>,
core_socket: SocketAddress,
test_only_init_phase_override: Option<ProtocolPhase>,
) {
let protocol_state =
ProtocolState::new(nsm, handles.clone(), test_only_init_phase_override)
.shared();
let core_pool = StreamPool::single(core_socket)
.expect("unable to create single socket core pool");
let protocol_processor = ProtocolProcessor::new(protocol_state);
let _protocol_server =
SocketServer::listen_all(core_pool, protocol_processor, 1)
.expect("unable to get listen task list for protocol server");
println!("Reaper::server running");
while *server_state.read().unwrap() != InterState::Quitting {
tokio::time::sleep(REAPER_STATE_CHECK_DELAY).await;
}
println!("Reaper::server shutdown");
}
fn run_vsock_to_tcp_bridge(
core_socket: &SocketAddress,
bridges: &Vec<BridgeConfig>,
) -> Result<(), IOError> {
if bridges.is_empty() {
println!("skipping host bridge, not configured");
return Ok(());
}
for bc in bridges {
match bc {
BridgeConfig::Server { port, host: _ } => {
let app_socket = core_socket.with_port(*port)?;
let host_addr: SocketAddr =
SocketAddrV4::new(Ipv4Addr::LOCALHOST, *port).into();
let app_pool = StreamPool::single(app_socket)?;
let bridge = HostBridge::new(app_pool, host_addr);
bridge.vsock_to_tcp();
}
BridgeConfig::Client { port: _, host: _ } => {
panic!("client bridge unimplemented")
} }
}
Ok(())
}
fn reprint_pivot_output(child: &mut Child) {
let stdout = child.stdout.take().expect("failed to get pivot stdout");
let stderr = child.stderr.take().expect("failed to get pivot stderr");
let stdout_reader = BufReader::new(stdout);
let stderr_reader = BufReader::new(stderr);
tokio::spawn(async move {
let mut stdout_lines = stdout_reader.lines();
let mut stderr_lines = stderr_reader.lines();
loop {
tokio::select! {
line = stdout_lines.next_line() => {
match line {
Ok(Some(line)) => println!("PIVOT[OUT]: {line}"),
Ok(None) => break, Err(e) => eprintln!("error reading pivot stdout: {e}"),
}
}
line = stderr_lines.next_line() => {
match line {
Ok(Some(line)) => eprintln!("PIVOT[ERR]: {line}"),
Ok(None) => break, Err(e) => eprintln!("error reading pivot stderr: {e}"),
}
}
}
}
});
}
pub struct Reaper;
impl Reaper {
pub async fn execute(
handles: &Handles,
nsm: Box<dyn NsmProvider + Send>,
core_socket: SocketAddress,
test_only_init_phase_override: Option<ProtocolPhase>,
) {
let inter_state = Arc::new(RwLock::new(InterState::Booting));
let server_state = inter_state.clone();
let server_worker = tokio::spawn(run_server(
server_state,
handles.clone(),
nsm,
core_socket.clone(),
test_only_init_phase_override,
));
loop {
let server_state = *inter_state.read().unwrap();
if server_state == InterState::Quitting {
eprintln!("quit called by ctrl+c");
std::process::exit(1);
}
if handles.quorum_key_exists()
&& handles.pivot_exists()
&& handles.manifest_envelope_exists()
{
break;
}
eprintln!("Reaper::execute waiting for pivot and manifest");
tokio::time::sleep(REAPER_STATE_CHECK_DELAY).await;
}
println!("Reaper::execute about to spawn pivot");
let manifest = handles
.get_manifest_envelope()
.expect("Checked above that the manifest exists.")
.manifest();
let args = manifest.args().to_vec();
let restart = manifest.restart();
let host_config = manifest.bridge_config().to_vec();
run_vsock_to_tcp_bridge(&core_socket, &host_config)
.expect("failed to run VSOCK -> TCP socket bridge");
let mut pivot = Command::new(handles.pivot_path());
pivot.env_clear();
pivot.args(&args[..]);
if manifest.debug_mode() {
pivot.stdout(Stdio::piped()).stderr(Stdio::piped());
} else {
pivot.stdout(Stdio::null()).stderr(Stdio::null());
}
loop {
let mut child = pivot.spawn().expect("Failed to spawn pivot");
if manifest.debug_mode() {
reprint_pivot_output(&mut child);
}
let status =
child.wait().await.expect("Pivot executable never started...");
println!("Pivot exited with status: {status}");
tokio::time::sleep(REAPER_RESTART_DELAY).await;
match restart {
RestartPolicy::Always => {}
RestartPolicy::Never => break,
}
println!("Restarting pivot ...");
}
*inter_state.write().unwrap() = InterState::Quitting;
tokio::time::sleep(REAPER_EXIT_DELAY).await;
if let Err(err) = server_worker.await {
eprintln!("Reaper::execute server_worker join error: {err:?}");
}
println!("Reaper exiting ... ");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum InterState {
Booting,
Quitting,
}