use std::io::Read;
use std::net::TcpStream;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
static FUTURE_ID: AtomicU64 = AtomicU64::new(1);
fn next_future_id() -> u64 {
FUTURE_ID.fetch_add(1, Ordering::Relaxed)
}
use crate::compose::ServiceSpec;
use crate::container::{Command, Namespace, Stdio, Volume};
use crate::image;
use crate::network::NetworkMode;
use super::env::Env;
use super::value::{LispError, Value};
pub fn register_runtime_builtins(
env: &Env,
registry: Arc<Mutex<Vec<(String, i32)>>>,
thread_registry: Arc<Mutex<Vec<std::thread::JoinHandle<()>>>>,
project: String,
compose_dir: PathBuf,
) {
let project = Rc::new(project);
let compose_dir = Rc::new(compose_dir);
{
let registry = Arc::clone(®istry);
let thread_registry = Arc::clone(&thread_registry);
let project = Rc::clone(&project);
let compose_dir = Rc::clone(&compose_dir);
native(env, "container-start", move |args| {
if args.is_empty() {
return Err(LispError::new(
"container-start: expected (service-spec [:env list])",
));
}
let mut svc = extract_service_spec("container-start", &args[0])?;
let mut i = 1;
while i < args.len() {
match &args[i] {
Value::Symbol(s) if s == ":env" => {
i += 1;
let env_list = args
.get(i)
.ok_or_else(|| LispError::new("container-start: :env requires a list"))?
.clone();
apply_inject_env(&mut svc, env_list, "container-start")?;
i += 1;
}
other => {
return Err(LispError::new(format!(
"container-start: unexpected argument: {}",
other
)))
}
}
}
do_container_start(svc, &project, &compose_dir, ®istry, &thread_registry)
});
}
{
let registry = Arc::clone(®istry);
let thread_registry = Arc::clone(&thread_registry);
let project = Rc::clone(&project);
let compose_dir = Rc::clone(&compose_dir);
native(env, "container-start-bg", move |args| {
if args.is_empty() {
return Err(LispError::new(
"container-start-bg: expected (service-spec [:env list])",
));
}
let mut svc = extract_service_spec("container-start-bg", &args[0])?;
let mut i = 1;
while i < args.len() {
match &args[i] {
Value::Symbol(s) if s == ":env" => {
i += 1;
let env_list = args
.get(i)
.ok_or_else(|| {
LispError::new("container-start-bg: :env requires a list")
})?
.clone();
apply_inject_env(&mut svc, env_list, "container-start-bg")?;
i += 1;
}
other => {
return Err(LispError::new(format!(
"container-start-bg: unexpected argument: {}",
other
)))
}
}
}
let registry2 = Arc::clone(®istry);
let thread_registry2 = Arc::clone(&thread_registry);
let project_str = (*project).clone();
let compose_dir_path = compose_dir.to_path_buf();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let result = do_container_start_inner(
svc,
&project_str,
&compose_dir_path,
®istry2,
&thread_registry2,
)
.map(|r| (r.name, r.pid, r.ip))
.map_err(|e| e.message);
let _ = tx.send(result);
});
use crate::lisp::value::{PendingRx, Value as V};
let pending: PendingRx = std::sync::Arc::new(std::sync::Mutex::new(Some(rx)));
Ok(V::PendingContainer(pending))
});
}
native(env, "container-join", |args| {
if args.len() != 1 {
return Err(LispError::new(
"container-join: expected 1 argument (pending-container)",
));
}
match &args[0] {
Value::PendingContainer(arc) => {
let rx = arc
.lock()
.unwrap()
.take()
.ok_or_else(|| LispError::new("container-join: already joined"))?;
let (name, pid, ip) = rx
.recv()
.map_err(|_| LispError::new("container-join: background thread panicked"))?
.map_err(LispError::new)?;
Ok(Value::ContainerHandle {
name,
pid,
ip,
deps: vec![],
})
}
other => Err(LispError::new(format!(
"container-join: expected pending-container, got {}",
other.type_name()
))),
}
});
native(env, "start", |args| {
if args.is_empty() {
return Err(LispError::new(
"start: expected (svc [:needs list] [:env lambda])",
));
}
let svc = extract_service_spec("start", &args[0])?;
let mut after: Vec<Value> = Vec::new();
let mut inject: Option<Box<Value>> = None;
let mut i = 1;
while i < args.len() {
match &args[i] {
Value::Symbol(s) if s == ":needs" => {
i += 1;
let deps = args
.get(i)
.ok_or_else(|| LispError::new("start: :needs requires a list"))?
.to_vec()
.map_err(|_| LispError::new("start: :needs requires a list"))?;
for dep in deps {
match &dep {
Value::Future { .. } => after.push(dep),
other => {
return Err(LispError::new(format!(
"start: :needs requires futures, got {}",
other.type_name()
)))
}
}
}
i += 1;
}
Value::Symbol(s) if s == ":env" => {
i += 1;
let f = args
.get(i)
.ok_or_else(|| LispError::new("start: :env requires a lambda"))?
.clone();
match &f {
Value::Lambda { .. } | Value::Native(_, _) => {}
other => {
return Err(LispError::new(format!(
"start: :env requires a lambda, got {}",
other.type_name()
)))
}
}
inject = Some(Box::new(f));
i += 1;
}
other => {
return Err(LispError::new(format!(
"start: unexpected argument: {}",
other
)))
}
}
}
use crate::lisp::value::FutureKind;
Ok(Value::Future {
id: next_future_id(),
name: svc.name.clone(),
kind: FutureKind::Container {
spec: Box::new(svc),
inject,
},
after,
})
});
native(env, "then", |args| {
if args.len() < 2 {
return Err(LispError::new(
"then: expected (future transform-lambda [:name string])",
));
}
let upstream_name = match &args[0] {
Value::Future { name, .. } => name.clone(),
other => {
return Err(LispError::new(format!(
"then: expected future, got {}",
other.type_name()
)))
}
};
match &args[1] {
Value::Lambda { .. } | Value::Native(_, _) => {}
other => {
return Err(LispError::new(format!(
"then: expected lambda, got {}",
other.type_name()
)))
}
}
let name = if args.len() >= 4 {
match (&args[2], &args[3]) {
(Value::Symbol(k), Value::Str(n)) if k == ":name" => n.clone(),
_ => format!("{}-then", upstream_name),
}
} else {
format!("{}-then", upstream_name)
};
use crate::lisp::value::FutureKind;
let upstream = args[0].clone();
Ok(Value::Future {
id: next_future_id(),
name,
kind: FutureKind::Transform {
upstream: Box::new(upstream.clone()),
transform: Box::new(args[1].clone()),
},
after: vec![upstream],
})
});
native(env, "then-all", |args| {
if args.len() != 2 {
return Err(LispError::new(
"then-all: expected (list-of-futures lambda)",
));
}
let futures_list = args[0]
.to_vec()
.map_err(|_| LispError::new("then-all: first argument must be a list of futures"))?;
let mut upstreams: Vec<Value> = Vec::new();
let mut name_parts: Vec<String> = Vec::new();
for f in futures_list {
match &f {
Value::Future { name, .. } => {
name_parts.push(name.clone());
upstreams.push(f);
}
other => {
return Err(LispError::new(format!(
"then-all: expected futures in list, got {}",
other.type_name()
)))
}
}
}
match &args[1] {
Value::Lambda { .. } | Value::Native(_, _) => {}
other => {
return Err(LispError::new(format!(
"then-all: expected lambda, got {}",
other.type_name()
)))
}
}
use crate::lisp::value::FutureKind;
Ok(Value::Future {
id: next_future_id(),
name: format!("join({})", name_parts.join(",")),
kind: FutureKind::Join {
transform: Box::new(args[1].clone()),
},
after: upstreams,
})
});
{
let registry = Arc::clone(®istry);
let thread_registry = Arc::clone(&thread_registry);
let project = Rc::clone(&project);
let compose_dir = Rc::clone(&compose_dir);
native(env, "run", move |args| {
if args.is_empty() {
return Err(LispError::new(
"run: expected (futures-list [:parallel] [:max-parallel N])",
));
}
let future_list = args[0]
.to_vec()
.map_err(|_| LispError::new("run: argument must be a list of futures"))?;
let mut parallel = false;
let mut max_parallel: Option<usize> = None;
let mut ki = 1;
while ki < args.len() {
match &args[ki] {
Value::Symbol(s) if s == ":parallel" => {
parallel = true;
ki += 1;
}
Value::Symbol(s) if s == ":max-parallel" => {
ki += 1;
match args.get(ki) {
Some(Value::Int(n)) if *n > 0 => {
max_parallel = Some(*n as usize);
parallel = true;
}
_ => {
return Err(LispError::new(
"run: :max-parallel requires a positive integer",
))
}
}
ki += 1;
}
other => {
return Err(LispError::new(format!(
"run: unexpected argument: {}",
other
)))
}
}
}
use super::eval::eval_apply;
use crate::lisp::value::FutureKind;
struct Entry {
id: u64,
kind: FutureKind,
after_ids: Vec<u64>,
}
for v in &future_list {
if !matches!(v, Value::Future { .. }) {
return Err(LispError::new(format!(
"run: expected futures, got {}",
v.type_name()
)));
}
}
fn collect_transitive(
v: &Value,
seen: &mut std::collections::HashSet<u64>,
all: &mut Vec<Value>,
) {
if let Value::Future { id, after, .. } = v {
if seen.insert(*id) {
for dep in after {
collect_transitive(dep, seen, all);
}
all.push(v.clone());
}
}
}
let mut all_futures: Vec<Value> = Vec::new();
let mut seen_ids: std::collections::HashSet<u64> = std::collections::HashSet::new();
for v in &future_list {
collect_transitive(v, &mut seen_ids, &mut all_futures);
}
let mut entries: Vec<Entry> = Vec::new();
for v in all_futures {
if let Value::Future {
id,
name,
kind,
after,
} = v
{
let after_ids = after.iter().filter_map(Value::future_id).collect();
entries.push(Entry {
id,
kind,
after_ids,
});
let _ = name; }
}
let n = entries.len();
let id_to_idx: std::collections::HashMap<u64, usize> =
entries.iter().enumerate().map(|(i, e)| (e.id, i)).collect();
let mut in_degree = vec![0usize; n];
let mut dependents: Vec<Vec<usize>> = vec![vec![]; n];
for (i, e) in entries.iter().enumerate() {
for dep_id in &e.after_ids {
if let Some(&dep_idx) = id_to_idx.get(dep_id) {
in_degree[i] += 1;
dependents[dep_idx].push(i);
}
}
}
let mut ready: Vec<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
let mut tiers: Vec<Vec<usize>> = Vec::new();
while !ready.is_empty() {
let tier = std::mem::take(&mut ready);
for &i in &tier {
for &j in &dependents[i] {
in_degree[j] -= 1;
if in_degree[j] == 0 {
ready.push(j);
}
}
}
tiers.push(tier);
}
if tiers.iter().map(|t| t.len()).sum::<usize>() != n {
return Err(LispError::new("run: dependency cycle detected"));
}
let exec_pos: std::collections::HashMap<u64, usize> = tiers
.iter()
.flatten()
.enumerate()
.map(|(pos, &idx)| (entries[idx].id, pos))
.collect();
let mut resolved: std::collections::HashMap<u64, Value> =
std::collections::HashMap::new();
if !parallel {
for tier in &tiers {
for &idx in tier {
let e = &entries[idx];
let result = match &e.kind {
FutureKind::Container { spec, inject } => {
let mut spec = *spec.clone();
if let Some(inject_fn) = inject {
let dep_vals: Vec<Value> = e
.after_ids
.iter()
.map(|id| resolved.get(id).cloned().unwrap_or(Value::Nil))
.collect();
let env_list = eval_apply(inject_fn, &dep_vals)?;
apply_inject_env(&mut spec, env_list, "run")?;
}
do_container_start(
spec,
&project,
&compose_dir,
®istry,
&thread_registry,
)?
}
FutureKind::Transform {
upstream,
transform,
} => {
let upstream_id = upstream.future_id().unwrap_or(0);
let upstream_val =
resolved.get(&upstream_id).cloned().unwrap_or(Value::Nil);
let result = eval_apply(transform, &[upstream_val])?;
match result {
Value::Future { .. } => resolve_dynamic(
result,
&mut resolved,
&project,
&compose_dir,
®istry,
&thread_registry,
)?,
other => other,
}
}
FutureKind::Join { transform } => {
let upstream_vals: Vec<Value> = e
.after_ids
.iter()
.map(|id| resolved.get(id).cloned().unwrap_or(Value::Nil))
.collect();
let result = eval_apply(transform, &upstream_vals)?;
match result {
Value::Future { .. } => resolve_dynamic(
result,
&mut resolved,
&project,
&compose_dir,
®istry,
&thread_registry,
)?,
other => other,
}
}
};
resolved.insert(e.id, result);
}
}
} else {
let chunk_size = max_parallel.unwrap_or(0);
for tier in &tiers {
let mut tier_results: Vec<(usize, Value)> = Vec::new();
let mut container_jobs: Vec<(usize, ServiceSpec)> = Vec::new();
for &idx in tier {
let e = &entries[idx];
match &e.kind {
FutureKind::Container { spec, inject } => {
let mut spec = *spec.clone();
if let Some(inject_fn) = inject {
let dep_vals: Vec<Value> = e
.after_ids
.iter()
.map(|id| resolved.get(id).cloned().unwrap_or(Value::Nil))
.collect();
let env_list = eval_apply(inject_fn, &dep_vals)?;
apply_inject_env(&mut spec, env_list, "run")?;
}
container_jobs.push((idx, spec));
}
FutureKind::Transform {
upstream,
transform,
} => {
let upstream_id = upstream.future_id().unwrap_or(0);
let upstream_val =
resolved.get(&upstream_id).cloned().unwrap_or(Value::Nil);
let result = eval_apply(transform, &[upstream_val])?;
let result = match result {
Value::Future { .. } => resolve_dynamic(
result,
&mut resolved,
&project,
&compose_dir,
®istry,
&thread_registry,
)?,
other => other,
};
tier_results.push((idx, result));
}
FutureKind::Join { transform } => {
let upstream_vals: Vec<Value> = e
.after_ids
.iter()
.map(|id| resolved.get(id).cloned().unwrap_or(Value::Nil))
.collect();
let result = eval_apply(transform, &upstream_vals)?;
let result = match result {
Value::Future { .. } => resolve_dynamic(
result,
&mut resolved,
&project,
&compose_dir,
®istry,
&thread_registry,
)?,
other => other,
};
tier_results.push((idx, result));
}
}
}
let effective_chunk = if chunk_size == 0 {
container_jobs.len().max(1)
} else {
chunk_size
};
for chunk in container_jobs.chunks(effective_chunk) {
let mut handles: Vec<(
usize,
std::thread::JoinHandle<Result<SpawnResult, LispError>>,
)> = Vec::new();
for (idx, spec) in chunk {
let idx = *idx;
let spec = spec.clone();
let project_owned = (*project).clone();
let compose_dir_owned = (*compose_dir).clone();
let registry_arc = Arc::clone(®istry);
let thread_registry_arc = Arc::clone(&thread_registry);
let handle = std::thread::spawn(move || {
do_container_start_inner(
spec,
&project_owned,
&compose_dir_owned,
®istry_arc,
&thread_registry_arc,
)
});
handles.push((idx, handle));
}
for (idx, handle) in handles {
match handle.join() {
Ok(Ok(r)) => {
let val = Value::ContainerHandle {
name: r.name,
pid: r.pid,
ip: r.ip,
deps: vec![],
};
tier_results.push((idx, val));
}
Ok(Err(e)) => return Err(e),
Err(_) => {
return Err(LispError::new("run: a worker thread panicked"))
}
}
}
}
tier_results.sort_by_key(|(idx, _)| *idx);
for (idx, val) in tier_results {
resolved.insert(entries[idx].id, val);
}
}
}
fn container_deps(
id: u64,
entries: &[Entry],
id_to_idx: &std::collections::HashMap<u64, usize>,
exec_pos: &std::collections::HashMap<u64, usize>,
resolved: &std::collections::HashMap<u64, Value>,
) -> Vec<Value> {
let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
let mut stack = vec![id];
let mut dep_ids: Vec<u64> = Vec::new();
while let Some(cur) = stack.pop() {
if !visited.insert(cur) {
continue;
}
if let Some(&idx) = id_to_idx.get(&cur) {
for &dep_id in &entries[idx].after_ids {
stack.push(dep_id);
}
if cur != id {
if let Some(e) = id_to_idx
.get(&cur)
.map(|&i| &entries[i])
.filter(|e| matches!(e.kind, FutureKind::Container { .. }))
{
if resolved.contains_key(&e.id) {
dep_ids.push(e.id);
}
}
}
}
}
dep_ids
.sort_by_key(|did| std::cmp::Reverse(exec_pos.get(did).copied().unwrap_or(0)));
dep_ids
.into_iter()
.filter_map(|did| resolved.get(&did).cloned())
.collect()
}
let mut pairs: Vec<Value> = Vec::new();
for v in &future_list {
if let Value::Future { id, name, kind, .. } = v {
if let Some(resolved_val) = resolved.get(id) {
let final_val = if matches!(kind, FutureKind::Container { .. }) {
let deps =
container_deps(*id, &entries, &id_to_idx, &exec_pos, &resolved);
match resolved_val {
Value::ContainerHandle { name, pid, ip, .. } => {
Value::ContainerHandle {
name: name.clone(),
pid: *pid,
ip: ip.clone(),
deps,
}
}
other => other.clone(),
}
} else {
resolved_val.clone()
};
pairs.push(Value::Pair(Rc::new((Value::Str(name.clone()), final_val))));
}
}
}
Ok(Value::list(pairs.into_iter()))
});
}
{
let registry = Arc::clone(®istry);
let thread_registry = Arc::clone(&thread_registry);
let project = Rc::clone(&project);
let compose_dir = Rc::clone(&compose_dir);
native(env, "resolve", move |args| {
if args.len() != 1 {
return Err(LispError::new("resolve: expected (future)"));
}
match &args[0] {
Value::Future { .. } => {}
other => {
return Err(LispError::new(format!(
"resolve: expected future, got {}",
other.type_name()
)))
}
}
let mut resolved = std::collections::HashMap::new();
resolve_dynamic(
args[0].clone(),
&mut resolved,
&project,
&compose_dir,
®istry,
&thread_registry,
)
});
}
{
let registry = Arc::clone(®istry);
let thread_registry = Arc::clone(&thread_registry);
let project = Rc::clone(&project);
let compose_dir = Rc::clone(&compose_dir);
native(env, "await", move |args| {
if args.is_empty() {
return Err(LispError::new(
"await: expected (future [:port P] [:timeout T])",
));
}
use crate::lisp::value::FutureKind;
let svc = match &args[0] {
Value::Future {
kind: FutureKind::Container { spec, .. },
..
} => *spec.clone(),
Value::Future {
kind: FutureKind::Transform { .. } | FutureKind::Join { .. },
..
} => {
return Err(LispError::new(
"await: Transform and Join futures must be executed via run or resolve",
))
}
other => {
return Err(LispError::new(format!(
"await: expected future, got {}",
other.type_name()
)))
}
};
let mut port: Option<u16> = None;
let mut timeout_secs = 60.0f64;
let mut i = 1;
while i < args.len() {
match &args[i] {
Value::Symbol(s) if s == ":port" => {
i += 1;
port = Some(match args.get(i) {
Some(Value::Int(n)) => *n as u16,
_ => return Err(LispError::new("await: :port requires an integer")),
});
i += 1;
}
Value::Symbol(s) if s == ":timeout" => {
i += 1;
timeout_secs = match args.get(i) {
Some(Value::Int(n)) => *n as f64,
Some(Value::Float(f)) => *f,
_ => return Err(LispError::new("await: :timeout requires a number")),
};
i += 1;
}
other => {
return Err(LispError::new(format!(
"await: unexpected argument: {}",
other
)))
}
}
}
let handle =
do_container_start(svc, &project, &compose_dir, ®istry, &thread_registry)?;
if let Some(p) = port {
let ip = match &handle {
Value::ContainerHandle { ip: Some(ip), .. } => ip.clone(),
_ => "127.0.0.1".to_string(),
};
let container_name = match &handle {
Value::ContainerHandle { name, .. } => name.clone(),
_ => "unknown".to_string(),
};
let addr = format!("{}:{}", ip, p);
let deadline = Instant::now() + Duration::from_secs_f64(timeout_secs);
loop {
if TcpStream::connect_timeout(
&addr.parse().map_err(|e| {
LispError::new(format!("await: invalid address '{}': {}", addr, e))
})?,
Duration::from_millis(250),
)
.is_ok()
{
break;
}
if Instant::now() >= deadline {
return Err(LispError::new(format!(
"await: '{}' port {} did not open within {}s",
container_name, p, timeout_secs
)));
}
std::thread::sleep(Duration::from_millis(250));
}
}
Ok(handle)
});
}
{
let registry = Arc::clone(®istry);
native(env, "container-stop", move |args| {
if args.len() != 1 {
return Err(LispError::new(
"container-stop: expected 1 argument (container-handle)",
));
}
stop_cascade(&args[0], ®istry)?;
Ok(Value::Nil)
});
}
{
let registry = Arc::clone(®istry);
native(env, "container-wait", move |args| {
if args.len() != 1 {
return Err(LispError::new(
"container-wait: expected 1 argument (container-handle)",
));
}
let (_, pid) = extract_handle("container-wait", &args[0])?;
loop {
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None) {
Err(nix::errno::Errno::ESRCH) => break,
Err(_) => break,
Ok(()) => std::thread::sleep(Duration::from_millis(100)),
}
}
if let Value::ContainerHandle { name, deps, .. } = &args[0] {
registry.lock().unwrap().retain(|(n, _)| n != name);
for dep in deps {
stop_cascade(dep, ®istry)?;
}
}
Ok(Value::Int(0))
});
}
{
let registry = Arc::clone(®istry);
let thread_registry = Arc::clone(&thread_registry);
let project = Rc::clone(&project);
let compose_dir = Rc::clone(&compose_dir);
native(env, "container-run", move |args| {
if args.len() != 1 {
return Err(LispError::new(
"container-run: expected 1 argument (service-spec)",
));
}
let svc = extract_service_spec("container-run", &args[0])?;
let handle =
do_container_start(svc, &project, &compose_dir, ®istry, &thread_registry)?;
let (name, pid) = extract_handle("container-run", &handle)?;
loop {
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None) {
Err(nix::errno::Errno::ESRCH) => break,
_ => std::thread::sleep(Duration::from_millis(100)),
}
}
registry.lock().unwrap().retain(|(n, _)| n != &name);
Ok(Value::Int(0))
});
}
native(env, "container-ip", |args| {
if args.len() != 1 {
return Err(LispError::new(
"container-ip: expected 1 argument (container-handle)",
));
}
match &args[0] {
Value::ContainerHandle { ip, .. } => match ip {
Some(s) => Ok(Value::Str(s.clone())),
None => Ok(Value::Nil),
},
a => Err(LispError::new(format!(
"container-ip: expected container, got {}",
a.type_name()
))),
}
});
native(env, "container-status", |args| {
if args.len() != 1 {
return Err(LispError::new(
"container-status: expected 1 argument (container-handle)",
));
}
let (_, pid) = extract_handle("container-status", &args[0])?;
let alive = nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None).is_ok();
Ok(Value::Str(if alive { "running" } else { "exited" }.into()))
});
native(env, "await-port", |args| {
if args.len() < 2 || args.len() > 3 {
return Err(LispError::new(
"await-port: expected (host port [timeout-secs])",
));
}
let host = match &args[0] {
Value::Str(s) => s.clone(),
a => {
return Err(LispError::new(format!(
"await-port: expected string host, got {}",
a.type_name()
)))
}
};
let port = match &args[1] {
Value::Int(n) => *n as u16,
a => {
return Err(LispError::new(format!(
"await-port: expected integer port, got {}",
a.type_name()
)))
}
};
let timeout_secs = if args.len() == 3 {
match &args[2] {
Value::Int(n) => *n as f64,
Value::Float(f) => *f,
a => {
return Err(LispError::new(format!(
"await-port: expected number timeout, got {}",
a.type_name()
)))
}
}
} else {
60.0
};
let addr = format!("{}:{}", host, port);
let deadline = Instant::now() + Duration::from_secs_f64(timeout_secs);
loop {
if TcpStream::connect_timeout(
&addr.parse().map_err(|e| {
LispError::new(format!("await-port: invalid address '{}': {}", addr, e))
})?,
Duration::from_millis(250),
)
.is_ok()
{
return Ok(Value::Bool(true));
}
if Instant::now() >= deadline {
return Ok(Value::Bool(false));
}
std::thread::sleep(Duration::from_millis(250));
}
});
}
fn resolve_dynamic(
future: Value,
resolved: &mut std::collections::HashMap<u64, Value>,
project: &str,
compose_dir: &std::path::Path,
registry: &Arc<Mutex<Vec<(String, i32)>>>,
thread_registry: &Arc<Mutex<Vec<std::thread::JoinHandle<()>>>>,
) -> Result<Value, LispError> {
use super::eval::eval_apply;
use crate::lisp::value::FutureKind;
match future {
Value::Future {
id, kind, after, ..
} => {
if let Some(cached) = resolved.get(&id) {
return Ok(cached.clone());
}
let mut after_vals: Vec<Value> = Vec::new();
for dep_fut in after {
let val = resolve_dynamic(
dep_fut,
resolved,
project,
compose_dir,
registry,
thread_registry,
)?;
after_vals.push(val);
}
let result = match kind {
FutureKind::Container { spec, inject } => {
let mut spec = *spec;
if let Some(inj) = inject {
let env_list = eval_apply(&inj, &after_vals)?;
apply_inject_env(&mut spec, env_list, "resolve")?;
}
do_container_start(spec, project, compose_dir, registry, thread_registry)?
}
FutureKind::Transform {
upstream,
transform,
} => {
let upstream_val = resolve_dynamic(
*upstream,
resolved,
project,
compose_dir,
registry,
thread_registry,
)?;
let result = eval_apply(&transform, &[upstream_val])?;
match result {
Value::Future { .. } => resolve_dynamic(
result,
resolved,
project,
compose_dir,
registry,
thread_registry,
)?,
other => other,
}
}
FutureKind::Join { transform } => {
let result = eval_apply(&transform, &after_vals)?;
match result {
Value::Future { .. } => resolve_dynamic(
result,
resolved,
project,
compose_dir,
registry,
thread_registry,
)?,
other => other,
}
}
};
resolved.insert(id, result.clone());
Ok(result)
}
other => Ok(other),
}
}
struct SpawnResult {
name: String,
pid: i32,
ip: Option<String>,
}
fn do_container_start(
svc: ServiceSpec,
project: &str,
compose_dir: &std::path::Path,
registry: &Arc<Mutex<Vec<(String, i32)>>>,
thread_registry: &Arc<Mutex<Vec<std::thread::JoinHandle<()>>>>,
) -> Result<Value, LispError> {
let r = do_container_start_inner(svc, project, compose_dir, registry, thread_registry)?;
Ok(Value::ContainerHandle {
name: r.name,
pid: r.pid,
ip: r.ip,
deps: vec![],
})
}
fn do_container_start_inner(
svc: ServiceSpec,
project: &str,
compose_dir: &std::path::Path,
registry: &Arc<Mutex<Vec<(String, i32)>>>,
thread_registry: &Arc<Mutex<Vec<std::thread::JoinHandle<()>>>>,
) -> Result<SpawnResult, LispError> {
let image_ref = &svc.image;
let (_, manifest) = resolve_image(image_ref)?;
let layers = image::layer_dirs(&manifest);
if layers.is_empty() {
return Err(LispError::new(format!(
"container-start: service '{}': image has no layers",
svc.name
)));
}
let layer_dirs = layers.clone();
let exe_and_args = if let Some(ref cmd) = svc.command {
cmd.clone()
} else {
let mut cmd_vec = manifest.config.entrypoint.clone();
cmd_vec.extend(manifest.config.cmd.clone());
if cmd_vec.is_empty() {
vec!["/bin/sh".to_string()]
} else {
cmd_vec
}
};
let exe = &exe_and_args[0];
let rest = &exe_and_args[1..];
let container_name = format!("{}-{}", project, svc.name);
let mut cmd = Command::new(exe).args(rest).with_image_layers(layers);
for env_str in &manifest.config.env {
if let Some((k, v)) = env_str.split_once('=') {
cmd = cmd.env(k, v);
}
}
if !manifest.config.working_dir.is_empty() && svc.workdir.is_none() {
cmd = cmd.with_cwd(&manifest.config.working_dir);
}
if svc.user.is_none() && !manifest.config.user.is_empty() {
if let Ok((uid, gid)) = parse_user_in_layers(&manifest.config.user, &layer_dirs) {
cmd = cmd.with_uid(uid);
if let Some(g) = gid {
cmd = cmd.with_gid(g);
}
}
}
let svc_network_names: Vec<String> = svc
.networks
.iter()
.map(|n| scoped_network_name(project, n))
.collect();
for net_name in &svc_network_names {
crate::network::ensure_network(net_name).map_err(|e| {
LispError::new(format!(
"container-start: failed to ensure network '{}': {}",
net_name, e
))
})?;
}
if let Some(primary) = svc_network_names.first() {
cmd = cmd.with_network(NetworkMode::BridgeNamed(primary.clone()));
}
for additional in svc_network_names.iter().skip(1) {
cmd = cmd.with_additional_network(additional);
}
if !svc_network_names.is_empty() {
cmd = cmd.with_nat();
}
for vol in &svc.volumes {
let scoped = format!("{}-{}", project, vol.name);
let v = Volume::open(&scoped)
.or_else(|_| Volume::create(&scoped))
.map_err(|e| LispError::new(format!("container-start: volume '{}': {}", scoped, e)))?;
cmd = cmd.with_volume(&v, &vol.mount_path);
}
for bm in &svc.bind_mounts {
let host = if std::path::Path::new(&bm.host_path).is_relative() {
compose_dir
.join(&bm.host_path)
.canonicalize()
.map_err(|e| {
LispError::new(format!(
"container-start: bind-mount host path '{}': {}",
bm.host_path, e
))
})?
.to_string_lossy()
.into_owned()
} else {
bm.host_path.clone()
};
if bm.read_only {
cmd = cmd.with_bind_mount_ro(&host, &bm.container_path);
} else {
cmd = cmd.with_bind_mount(&host, &bm.container_path);
}
}
for path in &svc.tmpfs_mounts {
cmd = cmd.with_tmpfs(path, "");
}
for (k, v) in &svc.env {
cmd = cmd.env(k, v);
}
let image_sets_path = manifest.config.env.iter().any(|e| e.starts_with("PATH="));
if !image_sets_path {
cmd = cmd.env(
"PATH",
"/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
);
}
for port in &svc.ports {
cmd = cmd.with_port_forward(port.host, port.container);
}
if let Some(ref mem) = svc.memory {
if let Ok(bytes) = parse_memory(mem) {
cmd = cmd.with_cgroup_memory(bytes);
}
}
if let Some(ref cpus) = svc.cpus {
if let Ok((quota, period)) = parse_cpus(cpus) {
cmd = cmd.with_cgroup_cpu_quota(quota, period);
}
}
if let Some(ref u) = svc.user {
if let Ok((uid, gid)) = parse_user_in_layers(u, &layer_dirs) {
cmd = cmd.with_uid(uid);
if let Some(g) = gid {
cmd = cmd.with_gid(g);
}
}
}
if let Some(ref w) = svc.workdir {
cmd = cmd.with_cwd(w);
}
let ns = cmd.namespaces();
cmd = cmd
.with_namespaces(ns | Namespace::PID | Namespace::UTS | Namespace::IPC)
.with_hostname(&container_name);
{
use crate::container::Capability;
let drop_all = svc.cap_drop.iter().any(|c| c.eq_ignore_ascii_case("ALL"));
let mut effective = if drop_all {
Capability::empty()
} else {
Capability::DEFAULT_CAPS
};
if !drop_all {
for name in &svc.cap_drop {
let n = name.to_uppercase().replace('-', "_");
let n = n.trim_start_matches("CAP_");
match Capability::from_name(n) {
Some(cap) => effective &= !cap,
None => log::warn!("cap-drop: unknown capability '{}' — skipping", name),
}
}
}
for name in &svc.cap_add {
let n = name.to_uppercase().replace('-', "_");
let n = n.trim_start_matches("CAP_");
match Capability::from_name(n) {
Some(cap) => effective |= cap,
None => log::warn!("cap-add: unknown capability '{}' — skipping", name),
}
}
cmd = cmd
.with_seccomp_default()
.with_capabilities(effective)
.with_no_new_privileges(true)
.with_masked_paths_default();
}
cmd = cmd
.stdin(Stdio::Null)
.stdout(Stdio::Piped)
.stderr(Stdio::Piped);
let mut child = cmd.spawn().map_err(|e| {
LispError::new(format!(
"container-start: spawn '{}' failed: {}",
svc.name, e
))
})?;
let pid = child.pid();
let ip = child.container_ip();
let all_ips: Vec<(String, String)> = child
.container_ips()
.into_iter()
.map(|(name, ip)| (name.to_string(), ip))
.collect();
for (net_name, ip_str) in &all_ips {
let ip_addr: std::net::Ipv4Addr = match ip_str.parse() {
Ok(ip) => ip,
Err(_) => continue,
};
let net_def = match crate::network::load_network_def(net_name) {
Ok(d) => d,
Err(_) => continue,
};
if let Err(e) = crate::dns::dns_add_entry(
net_name,
&svc.name,
ip_addr,
net_def.gateway,
&["8.8.8.8".to_string(), "1.1.1.1".to_string()],
) {
log::warn!(
"container-start: dns: failed to register '{}' on {}: {}",
svc.name,
net_name,
e
);
}
}
let mut stdout_handle = child.take_stdout();
let mut stderr_handle = child.take_stderr();
let svc_name_log = svc.name.clone();
let t_stdout = std::thread::spawn(move || {
if let Some(mut src) = stdout_handle.take() {
let mut buf = [0u8; 4096];
while matches!(src.read(&mut buf), Ok(n) if n > 0) {}
}
});
let t_stderr = std::thread::spawn(move || {
if let Some(mut src) = stderr_handle.take() {
let mut buf = [0u8; 4096];
while matches!(src.read(&mut buf), Ok(n) if n > 0) {}
}
});
let all_ips_wait = all_ips.clone();
let t_waiter = std::thread::spawn(move || {
let _ = child.wait();
for (net_name, _) in &all_ips_wait {
let _ = crate::dns::dns_remove_entry(net_name, &svc_name_log);
}
});
drop(t_stdout);
drop(t_stderr);
thread_registry.lock().unwrap().push(t_waiter);
registry.lock().unwrap().push((container_name.clone(), pid));
log::info!(
"container-start: '{}' started (pid {}, ip {:?})",
container_name,
pid,
ip
);
Ok(SpawnResult {
name: container_name,
pid,
ip,
})
}
fn apply_inject_env(
spec: &mut ServiceSpec,
env_list: Value,
caller: &str,
) -> Result<(), LispError> {
for pair in env_list.to_vec()? {
match pair {
Value::Pair(p) => {
let k = match &p.0 {
Value::Str(s) => s.clone(),
Value::Symbol(s) => s.clone(),
other => {
return Err(LispError::new(format!(
"{}: inject env key must be string, got {}",
caller,
other.type_name()
)))
}
};
let v = match &p.1 {
Value::Str(s) => s.clone(),
other => format!("{}", other),
};
spec.env.insert(k, v);
}
other => {
return Err(LispError::new(format!(
"{}: inject must return (key . value) pairs, got {}",
caller,
other.type_name()
)))
}
}
}
Ok(())
}
fn stop_cascade(
handle: &Value,
registry: &Arc<Mutex<Vec<(String, i32)>>>,
) -> Result<(), LispError> {
match handle {
Value::ContainerHandle {
name, pid, deps, ..
} => {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(*pid),
nix::sys::signal::Signal::SIGTERM,
);
registry.lock().unwrap().retain(|(n, _)| n != name);
for dep in deps {
stop_cascade(dep, registry)?;
}
Ok(())
}
other => Err(LispError::new(format!(
"container-stop: expected container handle, got {}",
other.type_name()
))),
}
}
fn native<F>(env: &Env, name: &str, f: F)
where
F: Fn(&[Value]) -> Result<Value, LispError> + 'static,
{
use super::value::NativeFn;
env.borrow_mut().define(
name,
Value::Native(name.to_string(), Rc::new(f) as NativeFn),
);
}
fn extract_service_spec(fn_name: &str, v: &Value) -> Result<ServiceSpec, LispError> {
match v {
Value::ServiceSpec(s) => Ok(*s.clone()),
other => Err(LispError::new(format!(
"{}: expected service-spec, got {}",
fn_name,
other.type_name()
))),
}
}
fn extract_handle(fn_name: &str, v: &Value) -> Result<(String, i32), LispError> {
match v {
Value::ContainerHandle { name, pid, .. } => Ok((name.clone(), *pid)),
other => Err(LispError::new(format!(
"{}: expected container handle, got {}",
fn_name,
other.type_name()
))),
}
}
fn resolve_image(image_ref: &str) -> Result<(String, image::ImageManifest), LispError> {
if let Ok(m) = image::load_image(image_ref) {
return Ok((image_ref.to_string(), m));
}
let normalised = normalise_image_reference(image_ref);
let m = image::load_image(&normalised).map_err(|e| {
LispError::new(format!(
"image '{}' not found locally (run 'pelagos image pull {}'): {}",
image_ref, image_ref, e
))
})?;
Ok((normalised, m))
}
fn normalise_image_reference(r: &str) -> String {
let (name, tag) = r.split_once(':').map_or((r, "latest"), |(n, t)| (n, t));
if name.contains('/') {
if name.contains('.') || name.contains(':') {
format!("{}:{}", name, tag)
} else {
format!("docker.io/{}:{}", name, tag)
}
} else {
format!("docker.io/library/{}:{}", name, tag)
}
}
fn scoped_network_name(project: &str, net: &str) -> String {
let name = format!("{}-{}", project, net);
if name.len() > 12 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
name.hash(&mut hasher);
let h = hasher.finish();
format!("{}{:04x}", &name[..8], h as u16)
} else {
name
}
}
fn parse_memory(s: &str) -> Result<i64, String> {
let s = s.trim();
let (num, unit) = s
.find(|c: char| c.is_alphabetic())
.map(|i| (&s[..i], &s[i..]))
.unwrap_or((s, ""));
let base: i64 = num.parse().map_err(|_| format!("invalid memory: {}", s))?;
let mult = match unit.to_lowercase().as_str() {
"" | "b" => 1,
"k" | "kb" => 1024,
"m" | "mb" => 1024 * 1024,
"g" | "gb" => 1024 * 1024 * 1024,
_ => return Err(format!("unknown memory unit: {}", unit)),
};
Ok(base * mult)
}
fn parse_cpus(s: &str) -> Result<(i64, u64), String> {
let cpus: f64 = s
.trim()
.parse()
.map_err(|_| format!("invalid cpus: {}", s))?;
let period: u64 = 100_000;
let quota = (cpus * period as f64) as i64;
Ok((quota, period))
}
fn parse_user_in_layers(
user: &str,
layer_dirs: &[std::path::PathBuf],
) -> Result<(u32, Option<u32>), String> {
if let Some((uid_s, gid_s)) = user.split_once(':') {
if let (Ok(uid), Ok(gid)) = (uid_s.parse::<u32>(), gid_s.parse::<u32>()) {
return Ok((uid, Some(gid)));
}
}
if let Ok(uid) = user.parse::<u32>() {
return Ok((uid, None));
}
for layer in layer_dirs.iter().rev() {
let passwd = layer.join("etc/passwd");
if let Ok(contents) = std::fs::read_to_string(&passwd) {
for line in contents.lines() {
let fields: Vec<&str> = line.split(':').collect();
if fields.len() >= 4 && fields[0] == user {
let uid = fields[2].parse::<u32>().unwrap_or(0);
let gid = fields[3].parse::<u32>().ok();
return Ok((uid, gid));
}
}
}
}
Err(format!("user '{}' not found in image", user))
}