use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use notify::{Event, RecursiveMode, Watcher};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use crate::api::v1alpha1::*;
use crate::store::Store;
use crate::starlingfile::{self, Cmd, Manifest, TargetKind};
pub struct Engine {
store: Arc<Store>,
manifests: Vec<Manifest>,
by_name: HashMap<String, usize>,
build_rx: mpsc::UnboundedReceiver<String>,
build_tx: mpsc::UnboundedSender<String>,
dry_run: bool,
config_path: PathBuf,
config_files: Vec<PathBuf>,
started_serves: HashSet<String>,
serve_tasks: HashMap<String, tokio::task::AbortHandle>,
restart_rx: mpsc::UnboundedReceiver<String>,
proxy: Option<crate::proxy::ProxyHandle>,
}
impl Engine {
#[allow(clippy::too_many_arguments)]
pub fn new(
store: Arc<Store>,
manifests: Vec<Manifest>,
build_rx: mpsc::UnboundedReceiver<String>,
build_tx: mpsc::UnboundedSender<String>,
dry_run: bool,
config_path: PathBuf,
config_files: Vec<PathBuf>,
restart_rx: mpsc::UnboundedReceiver<String>,
proxy: Option<crate::proxy::ProxyHandle>,
) -> Self {
let by_name = manifests
.iter()
.enumerate()
.map(|(i, m)| (m.name.clone(), i))
.collect();
Engine {
store,
manifests,
by_name,
build_rx,
build_tx,
dry_run,
config_path,
config_files,
started_serves: HashSet::new(),
serve_tasks: HashMap::new(),
restart_rx,
proxy,
}
}
fn reindex(&mut self) {
self.by_name = self
.manifests
.iter()
.enumerate()
.map(|(i, m)| (m.name.clone(), i))
.collect();
}
fn materialize_all(&self) {
for (i, m) in self.manifests.iter().enumerate() {
self.store.upsert_resource(initial_resource(m, i as i32));
self.store.upsert_button(disable_button(&m.name));
for note in &m.notes {
self.store.append_log(Some(&m.name), "INFO", &format!("{note}\n"));
}
}
}
fn start_serves(&mut self) {
for m in self.manifests.clone() {
if !m.serve_cmd.is_empty() && self.started_serves.insert(m.name.clone()) {
self.spawn_serve(m);
}
}
}
pub async fn run(mut self) {
self.materialize_all();
self.start_watchers();
self.start_serves();
for name in self.initial_build_order() {
self.run_build(&name).await;
}
let (reload_tx, mut reload_rx) = mpsc::unbounded_channel::<()>();
let mut watch = self.config_files.clone();
if watch.is_empty() {
watch.push(self.config_path.clone());
}
spawn_path_watcher(watch, reload_tx);
loop {
tokio::select! {
maybe = self.build_rx.recv() => {
let Some(name) = maybe else { break };
let mut pending = vec![name];
tokio::time::sleep(Duration::from_millis(150)).await;
while let Ok(extra) = self.build_rx.try_recv() {
if !pending.contains(&extra) {
pending.push(extra);
}
}
for name in pending {
self.run_build(&name).await;
}
}
_ = reload_rx.recv() => {
while reload_rx.try_recv().is_ok() {}
self.reload().await;
}
restart = self.restart_rx.recv() => {
let Some(name) = restart else { continue };
self.restart(&name).await;
}
}
}
}
fn config_span(&self) -> String {
let name = self
.config_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("Starlingfile");
format!("({name})")
}
async fn reload(&mut self) {
let span = self.config_span();
self.store
.append_log(Some(&span), "INFO", "Config changed; reloading...\n");
let result = match starlingfile::load(&self.config_path) {
Ok(r) => r,
Err(e) => {
self.store
.append_log(Some(&span), "ERROR", &format!("Reload failed: {e}\n"));
return;
}
};
for line in result.log.lines() {
self.store.append_log(Some(&span), "INFO", &format!("{line}\n"));
}
let new_names: HashSet<String> = result.manifests.iter().map(|m| m.name.clone()).collect();
let removed: Vec<String> = self
.manifests
.iter()
.map(|m| m.name.clone())
.filter(|n| !new_names.contains(n))
.collect();
for name in removed {
self.store.remove_resource(&name);
self.started_serves.remove(&name);
if let Some(handle) = &self.proxy {
handle.remove(&name).await;
}
}
self.manifests = result.manifests;
self.config_files = result.config_files;
self.reindex();
self.materialize_all();
self.start_serves();
self.store.append_log(
Some(&span),
"INFO",
&format!("Config reloaded ({} resources)\n", self.manifests.len()),
);
for name in self.initial_build_order() {
self.run_build(&name).await;
}
}
fn initial_build_order(&self) -> Vec<String> {
let mut ordered = vec![];
let mut visited = std::collections::HashSet::new();
fn visit(
name: &str,
engine: &Engine,
ordered: &mut Vec<String>,
visited: &mut std::collections::HashSet<String>,
) {
if !visited.insert(name.to_string()) {
return;
}
if let Some(&i) = engine.by_name.get(name) {
for dep in engine.manifests[i].resource_deps.clone() {
visit(&dep, engine, ordered, visited);
}
}
ordered.push(name.to_string());
}
for m in &self.manifests {
if m.auto_init {
visit(&m.name, self, &mut ordered, &mut visited);
}
}
ordered
.into_iter()
.filter(|n| self.by_name.contains_key(n))
.collect()
}
fn start_watchers(&self) {
for m in &self.manifests {
if m.deps.is_empty() {
continue;
}
let name = m.name.clone();
let tx = self.build_tx.clone();
let store = self.store.clone();
let deps = m.deps.clone();
let auto_on_change = m.auto_on_change();
std::thread::spawn(move || {
let (raw_tx, raw_rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
let mut watcher = match notify::recommended_watcher(move |res| {
let _ = raw_tx.send(res);
}) {
Ok(w) => w,
Err(e) => {
store.append_log(
Some(&name),
"ERROR",
&format!("failed to create file watcher: {e}\n"),
);
return;
}
};
for dep in &deps {
let mode = if dep.is_dir() {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
if let Err(e) = watcher.watch(dep, mode) {
store.append_log(
Some(&name),
"WARN",
&format!("can't watch {}: {e}\n", dep.display()),
);
}
}
while let Ok(first) = raw_rx.recv() {
if !is_content_event(&first) {
continue;
}
while raw_rx.recv_timeout(Duration::from_millis(200)).is_ok() {}
store.append_log(Some(&name), "INFO", "Detected file change\n");
if auto_on_change {
if tx.send(name.clone()).is_err() {
break;
}
} else {
store.update_status(&name, |st| {
st.has_pending_changes = Some(true);
st.pending_build_since = Some(chrono::Utc::now().to_rfc3339());
});
}
}
});
}
}
fn spawn_serve(&mut self, mut m: Manifest) {
let store = self.store.clone();
let name = m.name.clone();
let proxy = self.proxy.clone();
let task = tokio::spawn(async move {
let proxy = if m.kind == TargetKind::Local { proxy } else { None };
let port = match (m.serve_port, &proxy) {
(Some(p), _) => Some(p),
(None, Some(handle)) => handle.allocate_port().await,
(None, None) => None,
};
if let Some(p) = port {
m.serve_cmd.env.push(("PORT".to_string(), p.to_string()));
m.serve_cmd.env.push(("HOST".to_string(), "127.0.0.1".to_string()));
}
if let (Some(handle), Some(p)) = (&proxy, port) {
let host = handle.hostname(&name);
handle.register(&name, p).await;
let url = handle.url_for(&name);
m.serve_cmd
.env
.push(("PORTLESS_URL".to_string(), url.clone()));
store.update_status(&name, |st| {
st.endpoint_links.retain(|l| l.name.as_deref() != Some(&host));
st.endpoint_links.insert(
0,
UIResourceLink {
url: Some(url.clone()),
name: Some(host.clone()),
},
);
});
store.append_log(
Some(&name),
"INFO",
&format!("Serving on {url} (PORT={p})\n"),
);
}
store.append_log(
Some(&name),
"INFO",
&format!("Running serve_cmd: {}\n", m.serve_cmd.display()),
);
store.update_status(&name, |st| {
st.runtime_status = Some("pending".to_string());
});
match spawn_streaming(&m.serve_cmd, &store, &name).await {
Ok(mut child) => {
store.update_status(&name, |st| {
st.runtime_status = Some("ok".to_string());
if let Some(pid) = child.id() {
st.local_resource_info = Some(UIResourceLocal {
pid: Some(pid as i64),
is_test: Some(false),
});
}
});
let status = child.wait().await;
let ok = status.map(|s| s.success()).unwrap_or(false);
store.update_status(&name, |st| {
st.runtime_status = Some(if ok { "none" } else { "error" }.to_string());
});
store.append_log(
Some(&name),
if ok { "INFO" } else { "ERROR" },
&format!("serve_cmd exited (ok={ok})\n"),
);
}
Err(e) => {
store.update_status(&name, |st| {
st.runtime_status = Some("error".to_string());
});
store.append_log(Some(&name), "ERROR", &format!("serve_cmd failed: {e}\n"));
}
}
});
self.serve_tasks.insert(m.name.clone(), task.abort_handle());
}
async fn restart(&mut self, name: &str) {
if let Some(handle) = self.serve_tasks.remove(name) {
handle.abort(); }
self.started_serves.remove(name);
if let Some(proxy) = &self.proxy {
proxy.remove(name).await;
}
self.store
.append_log(Some(name), "INFO", "Restarting serve_cmd...\n");
if let Some(&i) = self.by_name.get(name) {
let m = self.manifests[i].clone();
if !m.serve_cmd.is_empty() {
self.started_serves.insert(name.to_string());
self.spawn_serve(m);
}
}
}
async fn run_build(&self, name: &str) {
let Some(&i) = self.by_name.get(name) else {
return;
};
let m = self.manifests[i].clone();
if m.kind == TargetKind::Kubernetes {
self.run_k8s_build(name, &m).await;
return;
}
let now = Utc::now().to_rfc3339();
let span = format!("{name}:build");
if m.update_cmd.is_empty() {
self.store.update_status(name, |st| {
st.queued = Some(false);
st.pending_build_since = None;
st.update_status = Some("ok".to_string());
if st.runtime_status.is_none() {
st.runtime_status = Some(match m.kind {
TargetKind::Local => "not_applicable".to_string(),
_ => "pending".to_string(),
});
}
});
return;
}
self.store.update_status(name, |st| {
st.queued = Some(false);
st.pending_build_since = None;
st.update_status = Some("in_progress".to_string());
st.current_build = Some(UIBuildRunning {
start_time: Some(now.clone()),
span_id: Some(span.clone()),
});
});
self.store.append_log(
Some(name),
"INFO",
&format!("Building: {}\n", m.update_cmd.display()),
);
let result = run_to_completion(&m.update_cmd, &self.store, name).await;
let finish = Utc::now().to_rfc3339();
let error = match result {
Ok(true) => None,
Ok(false) => Some("command exited non-zero".to_string()),
Err(e) => Some(e),
};
let ok = error.is_none();
if let Some(err) = &error {
self.store
.append_log(Some(name), "ERROR", &format!("Build failed: {err}\n"));
} else {
self.store
.append_log(Some(name), "INFO", "Build succeeded\n");
}
self.store.update_status(name, |st| {
st.current_build = None;
st.last_deploy_time = Some(finish.clone());
st.update_status = Some(if ok { "ok" } else { "error" }.to_string());
if m.serve_cmd.is_empty() && st.runtime_status.is_none() {
st.runtime_status = Some("not_applicable".to_string());
}
st.build_history.insert(
0,
UIBuildTerminated {
start_time: Some(now.clone()),
finish_time: Some(finish.clone()),
span_id: Some(span.clone()),
error: error.clone(),
..Default::default()
},
);
st.build_history.truncate(10);
});
}
async fn run_k8s_build(&self, name: &str, m: &Manifest) {
if !m.live_update.is_empty() && self.store.build_count(name) > 0 && !self.dry_run {
if let Some(pod) = self.store.current_pod(name) {
self.live_update(name, m, &pod).await;
return;
}
}
let now = Utc::now().to_rfc3339();
let span = format!("{name}:build");
self.store.update_status(name, |st| {
st.queued = Some(false);
st.pending_build_since = None;
st.update_status = Some("in_progress".to_string());
st.current_build = Some(UIBuildRunning {
start_time: Some(now.clone()),
span_id: Some(span.clone()),
});
});
let mut error: Option<String> = None;
for db in &m.docker_builds {
self.store
.append_log(Some(name), "INFO", &format!("Building image: {}\n", db.image_ref));
match build_image(db, &self.store, name).await {
Ok(()) => {
if !self.dry_run {
kind_load(&db.image_ref, &self.store, name).await;
}
}
Err(e) => error = Some(e),
}
if error.is_some() {
break;
}
}
if error.is_none() && !m.k8s_apply_docs.is_empty() {
let docs = m.k8s_apply_docs.join("\n---\n");
let mut argv = vec![
"kubectl".to_string(),
"apply".to_string(),
"-f".to_string(),
"-".to_string(),
];
if self.dry_run {
argv.push("--dry-run=client".to_string());
argv.push("--validate=false".to_string());
}
self.store.append_log(
Some(name),
"INFO",
&format!(
"kubectl apply{}\n",
if self.dry_run { " (dry-run=client)" } else { "" }
),
);
match run_with_stdin(&argv, &docs, &self.store, name).await {
Ok(true) => {}
Ok(false) => error = Some("kubectl apply failed".to_string()),
Err(e) => error = Some(e),
}
}
let finish = Utc::now().to_rfc3339();
let ok = error.is_none();
if let Some(err) = &error {
self.store
.append_log(Some(name), "ERROR", &format!("Deploy failed: {err}\n"));
} else {
self.store
.append_log(Some(name), "INFO", "Deploy succeeded\n");
}
self.store.update_status(name, |st| {
st.current_build = None;
st.last_deploy_time = Some(finish.clone());
st.update_status = Some(if ok { "ok" } else { "error" }.to_string());
if !ok {
st.runtime_status = Some("error".to_string());
} else if self.dry_run {
st.runtime_status = Some("not_applicable".to_string());
}
st.build_history.insert(
0,
UIBuildTerminated {
start_time: Some(now.clone()),
finish_time: Some(finish.clone()),
span_id: Some(span.clone()),
error: error.clone(),
..Default::default()
},
);
st.build_history.truncate(10);
});
if ok && !self.dry_run && !m.pod_selector.is_empty() {
self.spawn_pod_watch(name.to_string(), m.pod_selector.clone());
}
}
async fn live_update(&self, name: &str, m: &Manifest, pod: &str) {
use crate::starlingfile::LiveUpdateStep;
let now = Utc::now().to_rfc3339();
let span = format!("{name}:build");
self.store.update_status(name, |st| {
st.update_status = Some("in_progress".to_string());
st.current_build = Some(UIBuildRunning {
start_time: Some(now.clone()),
span_id: Some(span.clone()),
});
});
self.store
.append_log(Some(name), "INFO", "Live update (no rebuild)\n");
let mut error: Option<String> = None;
for step in &m.live_update {
let argv = match step {
LiveUpdateStep::Sync { local, remote } => {
self.store.append_log(
Some(name),
"INFO",
&format!(" sync {local} -> {remote}\n"),
);
vec![
"kubectl".to_string(),
"cp".to_string(),
local.clone(),
format!("{pod}:{remote}"),
]
}
LiveUpdateStep::Run { cmd } => {
self.store
.append_log(Some(name), "INFO", &format!(" run {cmd}\n"));
vec![
"kubectl".to_string(),
"exec".to_string(),
pod.to_string(),
"--".to_string(),
"sh".to_string(),
"-c".to_string(),
cmd.clone(),
]
}
LiveUpdateStep::RestartContainer => {
self.store
.append_log(Some(name), "INFO", " restart_container\n");
vec![
"kubectl".to_string(),
"delete".to_string(),
"pod".to_string(),
pod.to_string(),
"--wait=false".to_string(),
]
}
LiveUpdateStep::FallBackOn(_) | LiveUpdateStep::InitialSync => continue,
};
let cmd = Cmd {
argv,
workdir: None,
env: vec![],
};
match run_to_completion(&cmd, &self.store, name).await {
Ok(true) => {}
Ok(false) => error = Some("live_update step failed".to_string()),
Err(e) => error = Some(e),
}
if error.is_some() {
break;
}
}
let finish = Utc::now().to_rfc3339();
let ok = error.is_none();
self.store.append_log(
Some(name),
if ok { "INFO" } else { "ERROR" },
&format!("Live update {}\n", if ok { "complete" } else { "failed" }),
);
self.store.update_status(name, |st| {
st.current_build = None;
st.last_deploy_time = Some(finish.clone());
st.update_status = Some(if ok { "ok" } else { "error" }.to_string());
st.build_history.insert(
0,
UIBuildTerminated {
start_time: Some(now.clone()),
finish_time: Some(finish.clone()),
span_id: Some(span.clone()),
error: error.clone(),
..Default::default()
},
);
st.build_history.truncate(10);
});
}
fn spawn_pod_watch(&self, name: String, selector: std::collections::BTreeMap<String, String>) {
let store = self.store.clone();
let sel = selector
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(",");
tokio::spawn(async move {
let mut streaming_pod: Option<String> = None;
loop {
let out = Command::new("kubectl")
.args(["get", "pods", "-l", &sel, "-o", "json"])
.output()
.await;
let Ok(out) = out else {
break;
};
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&out.stdout) {
if let Some(pod) = json["items"].as_array().and_then(|a| a.first()) {
let pod_name = pod["metadata"]["name"].as_str().unwrap_or("").to_string();
let phase = pod["status"]["phase"].as_str().unwrap_or("Unknown").to_string();
let restarts = pod["status"]["containerStatuses"]
.as_array()
.map(|cs| cs.iter().map(|c| c["restartCount"].as_i64().unwrap_or(0)).sum::<i64>())
.unwrap_or(0);
let ready = pod["status"]["containerStatuses"]
.as_array()
.map(|cs| cs.iter().all(|c| c["ready"].as_bool().unwrap_or(false)))
.unwrap_or(false);
let runtime = match phase.as_str() {
"Running" if ready => "ok",
"Running" | "Pending" => "pending",
"Succeeded" => "ok",
"Failed" => "error",
_ => "pending",
};
store.update_status(&name, |st| {
st.runtime_status = Some(runtime.to_string());
st.k8s_resource_info = Some(UIResourceKubernetes {
pod_name: Some(pod_name.clone()),
pod_status: Some(phase.clone()),
all_containers_ready: Some(ready),
pod_restarts: Some(restarts as i32),
span_id: Some(format!("{name}:pod")),
..Default::default()
});
});
if streaming_pod.as_deref() != Some(pod_name.as_str()) && !pod_name.is_empty()
{
streaming_pod = Some(pod_name.clone());
stream_pod_logs(pod_name, name.clone(), store.clone());
}
}
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
});
}
}
async fn kind_load(image_ref: &str, store: &Arc<Store>, span: &str) {
let ctx = match Command::new("kubectl")
.args(["config", "current-context"])
.output()
.await
{
Ok(o) if o.status.success() => String::from_utf8_lossy(&o.stdout).trim().to_string(),
_ => return,
};
let Some(cluster) = ctx.strip_prefix("kind-") else {
return;
};
store.append_log(
Some(span),
"INFO",
&format!("Loading {image_ref} into kind cluster '{cluster}'\n"),
);
let cmd = Cmd {
argv: vec![
"kind".to_string(),
"load".to_string(),
"docker-image".to_string(),
image_ref.to_string(),
"--name".to_string(),
cluster.to_string(),
],
workdir: None,
env: vec![],
};
let _ = run_to_completion(&cmd, store, span).await;
}
async fn build_image(
db: &crate::starlingfile::DockerBuild,
store: &Arc<Store>,
span: &str,
) -> Result<(), String> {
use bollard::image::BuildImageOptions;
use futures::StreamExt;
if let Some(command) = &db.command {
let mut cmd = command.clone();
cmd.env.push(("EXPECTED_REF".to_string(), db.image_ref.clone()));
return match run_to_completion(&cmd, store, span).await {
Ok(true) => Ok(()),
Ok(false) => Err(format!("custom_build {} command failed", db.image_ref)),
Err(e) => Err(e),
};
}
let docker = bollard::Docker::connect_with_local_defaults()
.map_err(|e| format!("connecting to Docker daemon: {e}"))?;
let context = db.context.clone();
let tar = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
let mut builder = tar::Builder::new(Vec::new());
builder.append_dir_all(".", &context)?;
builder.into_inner()
})
.await
.map_err(|e| e.to_string())?
.map_err(|e| format!("taring build context {}: {e}", db.context.display()))?;
let dockerfile = db
.dockerfile
.as_ref()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.unwrap_or("Dockerfile")
.to_string();
let options = BuildImageOptions {
dockerfile,
t: db.image_ref.clone(),
rm: true,
forcerm: true,
buildargs: db.build_args.iter().cloned().collect(),
..Default::default()
};
let mut stream = docker.build_image(options, None, Some(tar.into()));
while let Some(item) = stream.next().await {
match item {
Ok(info) => {
if let Some(s) = info.stream {
for line in s.lines() {
store.append_log(Some(span), "INFO", &format!("{line}\n"));
}
}
if let Some(err) = info.error {
store.append_log(Some(span), "ERROR", &format!("{err}\n"));
return Err(format!("docker build {}: {err}", db.image_ref));
}
}
Err(e) => return Err(format!("docker build {}: {e}", db.image_ref)),
}
}
Ok(())
}
fn spawn_path_watcher(files: Vec<std::path::PathBuf>, tx: mpsc::UnboundedSender<()>) {
std::thread::spawn(move || {
let (raw_tx, raw_rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
let mut watcher = match notify::recommended_watcher(move |res| {
let _ = raw_tx.send(res);
}) {
Ok(w) => w,
Err(_) => return,
};
let canon: std::collections::HashSet<std::path::PathBuf> = files
.iter()
.map(|f| std::fs::canonicalize(f).unwrap_or_else(|_| f.clone()))
.collect();
let mut dirs: Vec<std::path::PathBuf> = files
.iter()
.map(|f| match f.parent() {
Some(p) if !p.as_os_str().is_empty() => p.to_path_buf(),
_ => std::path::PathBuf::from("."),
})
.collect();
dirs.sort();
dirs.dedup();
for dir in &dirs {
let _ = watcher.watch(dir, RecursiveMode::NonRecursive);
}
while let Ok(ev) = raw_rx.recv() {
if !is_content_event(&ev) {
continue;
}
let touches = match &ev {
Ok(e) => e.paths.iter().any(|p| {
let c = std::fs::canonicalize(p).unwrap_or_else(|_| p.clone());
canon.contains(&c) || canon.contains(p)
}),
_ => false,
};
if !touches {
continue;
}
while raw_rx.recv_timeout(Duration::from_millis(200)).is_ok() {}
if tx.send(()).is_err() {
break;
}
}
});
}
fn stream_pod_logs(pod: String, span: String, store: Arc<Store>) {
tokio::spawn(async move {
let mut child = match Command::new("kubectl")
.args(["logs", "-f", "--all-containers", "--tail", "20", &pod])
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
{
Ok(c) => c,
Err(_) => return,
};
if let Some(out) = child.stdout.take() {
stream_lines(out, store, span, "INFO");
}
let _ = child.wait().await;
});
}
fn disable_button(resource: &str) -> UIButton {
let mut annotations = std::collections::BTreeMap::new();
annotations.insert(
"tilt.dev/uibutton-type".to_string(),
"DisableToggle".to_string(),
);
UIButton {
metadata: Some(ObjectMeta {
name: format!("{resource}-disable"),
uid: uuid::Uuid::new_v4().to_string(),
annotations: Some(annotations),
..Default::default()
}),
spec: Some(UIButtonSpec {
location: UIComponentLocation {
component_id: resource.to_string(),
component_type: "Resource".to_string(),
},
text: "Disable".to_string(),
icon_name: Some("toggle_on".to_string()),
..Default::default()
}),
status: Some(Default::default()),
}
}
fn initial_resource(m: &Manifest, order: i32) -> UIResource {
let mut st = UIResourceStatus {
order: Some(order),
trigger_mode: Some(m.trigger_mode),
update_status: Some("pending".to_string()),
runtime_status: Some(match m.kind {
TargetKind::Local if m.serve_cmd.is_empty() => "not_applicable".to_string(),
_ => "pending".to_string(),
}),
specs: vec![UIResourceTargetSpec {
id: Some(m.name.clone()),
target_type: Some(m.kind.target_type().to_string()),
has_live_update: Some(false),
}],
disable_status: Some(DisableResourceStatus {
enabled_count: 1,
disabled_count: 0,
state: "Enabled".to_string(),
sources: vec![],
}),
endpoint_links: m
.links
.iter()
.map(|(url, name)| UIResourceLink {
url: Some(url.clone()),
name: Some(name.clone()),
})
.collect(),
..Default::default()
};
if m.kind == TargetKind::Local {
st.local_resource_info = Some(UIResourceLocal {
pid: Some(0),
is_test: Some(false),
});
}
UIResource {
metadata: Some(ObjectMeta {
name: m.name.clone(),
uid: uuid::Uuid::new_v4().to_string(),
labels: if m.labels.is_empty() {
None
} else {
Some(m.labels.clone())
},
..Default::default()
}),
spec: Some(UIResourceSpec {}),
status: Some(st),
}
}
fn is_content_event(res: ¬ify::Result<Event>) -> bool {
match res {
Ok(ev) => matches!(
ev.kind,
notify::EventKind::Create(_)
| notify::EventKind::Modify(_)
| notify::EventKind::Remove(_)
),
Err(_) => false,
}
}
async fn spawn_streaming(
cmd: &Cmd,
store: &Arc<Store>,
span: &str,
) -> std::io::Result<tokio::process::Child> {
let mut command = Command::new(&cmd.argv[0]);
command
.args(&cmd.argv[1..])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
if let Some(dir) = &cmd.workdir {
command.current_dir(dir);
}
for (k, v) in &cmd.env {
command.env(k, v);
}
let mut child = command.spawn()?;
if let Some(out) = child.stdout.take() {
stream_lines(out, store.clone(), span.to_string(), "INFO");
}
if let Some(err) = child.stderr.take() {
stream_lines(err, store.clone(), span.to_string(), "INFO");
}
Ok(child)
}
async fn run_with_stdin(
argv: &[String],
stdin_data: &str,
store: &Arc<Store>,
span: &str,
) -> Result<bool, String> {
use tokio::io::AsyncWriteExt;
let mut child = Command::new(&argv[0])
.args(&argv[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| format!("spawn {}: {e}", argv.join(" ")))?;
if let Some(mut stdin) = child.stdin.take() {
let data = stdin_data.to_string();
stdin
.write_all(data.as_bytes())
.await
.map_err(|e| e.to_string())?;
drop(stdin);
}
if let Some(out) = child.stdout.take() {
stream_lines(out, store.clone(), span.to_string(), "INFO");
}
if let Some(err) = child.stderr.take() {
stream_lines(err, store.clone(), span.to_string(), "ERROR");
}
let status = child.wait().await.map_err(|e| e.to_string())?;
Ok(status.success())
}
async fn run_to_completion(
cmd: &Cmd,
store: &Arc<Store>,
span: &str,
) -> Result<bool, String> {
let mut child = spawn_streaming(cmd, store, span)
.await
.map_err(|e| format!("spawn {}: {e}", cmd.display()))?;
let status = child.wait().await.map_err(|e| e.to_string())?;
Ok(status.success())
}
fn stream_lines<R>(reader: R, store: Arc<Store>, span: String, level: &'static str)
where
R: tokio::io::AsyncRead + Unpin + Send + 'static,
{
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
store.append_log(Some(&span), level, &format!("{line}\n"));
}
});
}