use anyhow::anyhow;
use async_process::{Child, Command};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use serde::Deserialize;
use serde_json::{json, Value};
use std::{
collections::HashMap,
io::ErrorKind::InvalidInput,
net::{SocketAddr, TcpStream},
time::Duration,
};
use tokio::time::sleep;
#[derive(Debug, Default, Deserialize)]
struct Request {
program: String,
#[serde(default)]
arguments: Vec<String>,
#[serde(default)]
environment: HashMap<String, String>,
#[serde(default)]
ports: Vec<u16>,
}
async fn poll_for_exit_condition(
mut child: Child,
program: &str,
ports: &[u16],
) -> Result<(), anyhow::Error> {
let addresses: Vec<SocketAddr> = ports
.iter()
.map(|port| SocketAddr::from(([127, 0, 0, 1], *port)))
.collect();
let tenth_of_a_second = Duration::from_millis(100);
loop {
sleep(tenth_of_a_second).await;
if let Some(status) = child.try_status()? {
if status.success() {
println!("Child {} exited successfully.", program);
return Ok(());
} else {
println!("Child {} failed.", program);
return Err(anyhow!("{} process failed!", program));
}
}
if let Ok(stream) = TcpStream::connect(&addresses[..]) {
if let Ok(addr) = stream.peer_addr() {
println!("Connected to port {}: declaring success.", addr.port());
} else {
println!("Connected to port: declaring success.");
}
match child.kill() {
Err(error) if error.kind() != InvalidInput => {
return Err(anyhow!("Failed to kill {}: {}", program, error));
}
_ => return Ok(()),
}
}
}
}
async fn wait_for_exit(mut child: Child, program: &str) -> Result<(), anyhow::Error> {
let status = child.status().await?;
if status.success() {
println!("Child {} exited successfully.", program);
Ok(())
} else {
println!("Child {} failed.", program);
Err(anyhow!("{} process failed!", program))
}
}
fn completion_value(program: &str) -> Result<Value, Error> {
Ok(json!({ program: "success" }))
}
async fn run_program(request: &Request) -> Result<Value, Error> {
println!("Running {} {:?}.", request.program, request.arguments);
let child = Command::new(&request.program)
.args(&request.arguments)
.envs(&request.environment)
.spawn()?;
println!("Waiting for {} completion.", &request.program);
if request.ports.is_empty() {
wait_for_exit(child, &request.program).await?;
} else {
poll_for_exit_condition(child, &request.program, &request.ports).await?;
}
completion_value(&request.program)
}
async fn proxy(event: LambdaEvent<Request>) -> Result<Value, Error> {
let (request, _context) = event.into_parts();
run_program(&request).await
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let handler = service_fn(proxy);
run(handler).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::consts::OS;
use tokio;
#[tokio::test]
async fn simple_success() {
let request = Request {
program: "true".to_string(),
arguments: vec![],
environment: HashMap::new(),
ports: vec![],
};
let result = run_program(&request).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn simple_failure() {
let request = Request {
program: "false".to_string(),
arguments: vec![],
environment: HashMap::new(),
ports: vec![],
};
let result = run_program(&request).await;
assert!(result.is_err());
}
#[tokio::test]
async fn polled_success() {
let request = Request {
program: "true".to_string(),
arguments: vec![],
environment: HashMap::new(),
ports: vec![8080],
};
let result = run_program(&request).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn polled_failure() {
let request = Request {
program: "false".to_string(),
arguments: vec![],
environment: HashMap::new(),
ports: vec![8080],
};
let result = run_program(&request).await;
assert!(result.is_err());
}
#[tokio::test]
async fn exit_on_open_port() {
let arguments = if OS == "linux" {
vec!["-lp".to_string(), "8080".to_string()]
} else {
vec!["-l".to_string(), "8080".to_string()]
};
let request = Request {
program: "nc".to_string(),
arguments,
ports: vec![8080],
..Default::default()
};
let result = run_program(&request).await;
assert!(result.is_ok());
}
}