use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail};
use clap::{Arg, Command};
use rquickjs::CatchResultExt;
use crate::mprocs::app::ClientId;
use crate::{
client::client_main,
console::create_console_task,
daemon::{
lockfile,
receiver::MsgReceiver,
sender::MsgSender,
socket::{bind_server_socket, connect_client_socket},
},
js::js_vm::JsVm,
kernel::{
kernel::Kernel,
kernel_message::{
KernelCommand, KernelQuery, KernelQueryResponse, TaskContext,
},
task::{TaskCmd, TaskStatus},
task_path::TaskPath,
},
lualib::init_std,
protocol::{CltToSrv, DkRequest, DkResponse, DkTaskInfo, SrvToClt},
term::Size,
};
async fn run_server(
working_dir: PathBuf,
log_level: Option<&str>,
) -> anyhow::Result<()> {
let _logger = crate::logging::init(crate::logging::Config {
binary: "dk",
cli_level: log_level,
log_env: "DK_LOG",
file_env: "DK_LOG_FILE",
default_dir: Some(&working_dir),
})?;
let lock_guard = lockfile::create_lock_file(&working_dir)?;
log::info!("Lock file created for directory: {}", working_dir.display());
#[cfg(unix)]
crate::process::unix_processes_waiter::UnixProcessesWaiter::init()?;
let kernel = Kernel::new();
let pc = kernel.context();
let socket_path = lock_guard.socket_path().to_path_buf();
let (app_task_id, console_vt) = create_console_task(&pc);
let app_sender = pc.get_task_sender(app_task_id);
spawn_configured_procs(&pc, &working_dir);
tokio::spawn(async move {
let mut last_client_id = 0;
let mut server_socket = match bind_server_socket(&socket_path).await {
Ok(server_socket) => {
log::info!("Server is listening.");
#[cfg(unix)]
{
server_socket
}
#[cfg(windows)]
{
let (sock, _addr) = server_socket;
sock
}
}
Err(err) => {
log::error!("Failed to bind the server: {:?}", err);
pc.send(KernelCommand::Quit);
return;
}
};
log::debug!("Waiting for clients...");
loop {
match server_socket.accept().await {
Ok((sender, receiver)) => {
last_client_id += 1;
let client_id = ClientId(last_client_id);
let app_sender = app_sender.clone();
let pc = pc.clone();
let console_task_id = app_task_id;
let console_vt = console_vt.clone();
tokio::spawn(async move {
dispatch_connection(
client_id,
app_sender,
pc,
console_task_id,
console_vt,
sender,
receiver,
)
.await;
});
}
Err(err) => {
log::debug!("Server socket accept error: {}", err);
break;
}
}
}
});
kernel.run().await;
drop(lock_guard);
#[cfg(unix)]
crate::process::unix_processes_waiter::UnixProcessesWaiter::uninit()?;
Ok(())
}
fn print_task_list(resp: DkResponse) {
match resp {
DkResponse::TaskList(tasks) => {
if tasks.is_empty() {
println!("No tasks.");
} else {
for t in &tasks {
println!("{}\t{}", t.path, t.status);
}
}
}
DkResponse::Error(e) => eprintln!("Error: {}", e),
_ => eprintln!("Unexpected response"),
}
}
fn spawn_configured_procs(pc: &TaskContext, working_dir: &Path) {
let config = match crate::config::Config::load(working_dir) {
Ok(cfg) => cfg,
Err(err) => {
log::warn!("Failed to load config: {}", err);
return;
}
};
for proc in &config.procs {
let path = match TaskPath::new(&format!("/{}", proc.name)) {
Ok(p) => p,
Err(err) => {
log::warn!("Invalid proc name {:?}: {}", proc.name, err);
continue;
}
};
if proc.cmd.is_empty() {
log::warn!("Proc {:?} has empty cmd; skipping", proc.name);
continue;
}
let mut spec =
crate::process::process_spec::ProcessSpec::from_argv(proc.cmd.clone());
if let Some(cwd) = &proc.cwd {
spec.cwd(cwd);
} else {
spec.cwd(working_dir.to_string_lossy());
}
crate::task::proc_task::spawn_proc_task(pc, path, spec);
}
}
async fn dispatch_connection(
client_id: ClientId,
app_sender: crate::kernel::kernel_message::TaskSender,
pc: TaskContext,
console_task_id: crate::kernel::task::TaskId,
console_vt: crate::kernel::kernel_message::SharedVt,
mut sender: MsgSender<SrvToClt>,
mut receiver: MsgReceiver<CltToSrv>,
) {
let first_msg = receiver.recv().await;
match first_msg {
Some(Ok(CltToSrv::Rpc(req))) => {
let resp = handle_rpc(&pc, req)
.await
.unwrap_or_else(|e| DkResponse::Error(e.to_string()));
let _ = sender.send(SrvToClt::Rpc(resp)).await;
}
Some(Ok(CltToSrv::Init { width, height })) => {
crate::console::spawn_client_task(
&pc,
console_task_id,
console_vt,
app_sender,
client_id,
Size { width, height },
sender,
receiver,
);
}
_ => {
log::warn!("Unexpected first message from client");
}
}
}
async fn handle_rpc(
pc: &TaskContext,
req: DkRequest,
) -> anyhow::Result<DkResponse> {
let response = match req {
DkRequest::Ls { glob } => {
let query = KernelQuery::ListTasks(glob);
match pc.query(query).await? {
KernelQueryResponse::TaskList(tasks) => {
let items = tasks
.into_iter()
.map(|t| DkTaskInfo {
path: t
.path
.map(|p| p.to_string())
.unwrap_or_else(|| format!("<task:{}>", t.id.0)),
status: match t.status {
TaskStatus::Running => "running".to_string(),
TaskStatus::NotStarted => "not-started".to_string(),
TaskStatus::Exited(code) => format!("exited:{}", code),
},
})
.collect();
DkResponse::TaskList(items)
}
_ => DkResponse::Error("unexpected query response".to_string()),
}
}
DkRequest::Start { path } => {
let path = TaskPath::new(&path)?;
pc.send_to_path(path, TaskCmd::Start);
DkResponse::Ok
}
DkRequest::Stop { path } => {
let path = TaskPath::new(&path)?;
pc.send_to_path(path, TaskCmd::Stop);
DkResponse::Ok
}
DkRequest::Kill { path } => {
let path = TaskPath::new(&path)?;
pc.send_to_path(path, TaskCmd::Kill);
DkResponse::Ok
}
DkRequest::Restart { path } => {
let path = TaskPath::new(&path)?;
pc.send_to_path(path.clone(), TaskCmd::Stop);
pc.send_to_path(path, TaskCmd::Start);
DkResponse::Ok
}
DkRequest::Screen { path } => {
let path = TaskPath::new(&path)?;
let query = KernelQuery::GetScreen(path);
match pc.query(query).await? {
KernelQueryResponse::Screen(content) => DkResponse::Screen(content),
_ => DkResponse::Error("unexpected query response".to_string()),
}
}
DkRequest::Spawn { path, cmd, cwd } => {
let task_path = TaskPath::new(&path)?;
if cmd.is_empty() {
bail!("cmd must not be empty".to_string());
}
let mut spec = crate::process::process_spec::ProcessSpec::from_argv(cmd);
if let Some(cwd) = cwd {
spec.cwd(cwd);
} else if let Ok(cwd) = std::env::current_dir() {
spec.cwd(cwd.to_string_lossy());
}
crate::task::proc_task::spawn_proc_task(pc, task_path, spec);
DkResponse::Ok
}
};
Ok(response)
}
async fn rpc_request(
working_dir: &Path,
req: DkRequest,
spawn_server: bool,
) -> anyhow::Result<DkResponse> {
let (mut sender, mut receiver) =
connect_client_socket::<CltToSrv, SrvToClt>(working_dir, spawn_server)
.await?;
sender.send(CltToSrv::Rpc(req)).await?;
match receiver.recv().await {
Some(Ok(SrvToClt::Rpc(resp))) => Ok(resp),
Some(Ok(other)) => {
anyhow::bail!("unexpected response: {:?}", other)
}
Some(Err(e)) => anyhow::bail!("decode error: {}", e),
None => anyhow::bail!("connection closed without response"),
}
}
pub async fn dekit_main() -> anyhow::Result<()> {
println!("* Welcome to dekit — playground for future features *\n");
let cmd = clap::command!()
.subcommands([
Command::new("attach"),
Command::new("up"),
Command::new("down"),
Command::new("spawn")
.about("Create and start a new task at the given path")
.arg(
Arg::new("path")
.long("path")
.required(true)
.help("Task path (e.g. /services/web)"),
)
.arg(
Arg::new("cwd")
.long("cwd")
.help("Working directory for the process"),
)
.arg(
Arg::new("cmd")
.required(true)
.num_args(1..)
.last(true)
.help("Command to run"),
),
Command::new("ls")
.about("List tasks")
.arg(Arg::new("glob").help("Optional glob pattern")),
Command::new("start")
.about("Start a stopped task")
.arg(Arg::new("path").required(true).help("Task path")),
Command::new("stop")
.about("Gracefully stop a task")
.arg(Arg::new("path").required(true).help("Task path")),
Command::new("kill")
.about("Force kill a task")
.arg(Arg::new("path").required(true).help("Task path")),
Command::new("restart")
.about("Restart a task")
.arg(Arg::new("path").required(true).help("Task path")),
Command::new("screen")
.about("Print the current screen of a task")
.arg(Arg::new("path").required(true).help("Task path")),
Command::new("server").subcommands([
Command::new("run")
.arg(
Arg::new("dir")
.long("dir")
.required(true)
.help("Working directory this daemon manages"),
)
.arg(
Arg::new("log-level")
.long("log-level")
.help("Diagnostic log level (off|error|warn|info|debug|trace, or env_logger spec). Falls back to $DK_LOG, $RUST_LOG, then 'error' (release) or 'trace' (debug)."),
),
Command::new("start")
.about("Start the daemon for the current directory"),
Command::new("stop").about("Stop the daemon for the current directory"),
Command::new("status")
.about("Show daemon status for the current directory"),
Command::new("list").about("List all daemons on this machine"),
Command::new("clean").about("Remove stale lock files"),
]),
])
.arg(
Arg::new("files")
.action(clap::ArgAction::Append)
.trailing_var_arg(true),
);
let matches = cmd.get_matches();
match matches.subcommand() {
Some(("attach", _sub_m)) => {
let working_dir = std::env::current_dir()?;
let (sender, receiver) =
connect_client_socket::<CltToSrv, SrvToClt>(&working_dir, false)
.await?;
client_main(sender, receiver).await?;
}
Some(("spawn", sub_m)) => {
let working_dir = std::env::current_dir()?;
let path = sub_m.get_one::<String>("path").unwrap().clone();
let cwd = sub_m.get_one::<String>("cwd").cloned();
let cmd: Vec<String> =
sub_m.get_many::<String>("cmd").unwrap().cloned().collect();
let resp =
rpc_request(&working_dir, DkRequest::Spawn { path, cmd, cwd }, true)
.await?;
match resp {
DkResponse::Ok => println!("Spawned."),
DkResponse::Error(e) => eprintln!("Error: {}", e),
_ => eprintln!("Unexpected response"),
}
}
Some(("ls", sub_m)) => {
let working_dir = std::env::current_dir()?;
let glob = sub_m.get_one::<String>("glob").cloned();
let resp =
rpc_request(&working_dir, DkRequest::Ls { glob }, false).await?;
print_task_list(resp);
}
Some(("start", sub_m)) => {
let working_dir = std::env::current_dir()?;
let path = sub_m.get_one::<String>("path").unwrap().clone();
let resp =
rpc_request(&working_dir, DkRequest::Start { path }, true).await?;
match resp {
DkResponse::Ok => println!("Started."),
DkResponse::Error(e) => eprintln!("Error: {}", e),
_ => eprintln!("Unexpected response"),
}
}
Some(("stop", sub_m)) => {
let working_dir = std::env::current_dir()?;
let path = sub_m.get_one::<String>("path").unwrap().clone();
let resp =
rpc_request(&working_dir, DkRequest::Stop { path }, false).await?;
match resp {
DkResponse::Ok => println!("Stopped."),
DkResponse::Error(e) => eprintln!("Error: {}", e),
_ => eprintln!("Unexpected response"),
}
}
Some(("kill", sub_m)) => {
let working_dir = std::env::current_dir()?;
let path = sub_m.get_one::<String>("path").unwrap().clone();
let resp =
rpc_request(&working_dir, DkRequest::Kill { path }, false).await?;
match resp {
DkResponse::Ok => println!("Killed."),
DkResponse::Error(e) => eprintln!("Error: {}", e),
_ => eprintln!("Unexpected response"),
}
}
Some(("restart", sub_m)) => {
let working_dir = std::env::current_dir()?;
let path = sub_m.get_one::<String>("path").unwrap().clone();
let resp =
rpc_request(&working_dir, DkRequest::Restart { path }, true).await?;
match resp {
DkResponse::Ok => println!("Restarted."),
DkResponse::Error(e) => eprintln!("Error: {}", e),
_ => eprintln!("Unexpected response"),
}
}
Some(("screen", sub_m)) => {
let working_dir = std::env::current_dir()?;
let path = sub_m.get_one::<String>("path").unwrap().clone();
let resp =
rpc_request(&working_dir, DkRequest::Screen { path }, false).await?;
match resp {
DkResponse::Screen(Some(content)) => {
print!("{}", content);
print!("\x1b[0m\n");
}
DkResponse::Screen(None) => {
eprintln!("No screen content for this task.");
}
DkResponse::Error(e) => eprintln!("Error: {}", e),
_ => eprintln!("Unexpected response"),
}
}
Some(("up", _sub_m)) => {
let working_dir = std::env::current_dir()?;
let resp =
rpc_request(&working_dir, DkRequest::Ls { glob: None }, true).await?;
print_task_list(resp);
}
Some(("down", _sub_m)) => {
let working_dir = std::env::current_dir()?;
lockfile::stop_daemon(&working_dir)?;
println!("Daemon stopped.");
}
Some(("server", sub_m)) => match sub_m.subcommand() {
Some(("run", run_m)) => {
let dir = run_m.get_one::<String>("dir").unwrap();
let log_level =
run_m.get_one::<String>("log-level").map(String::as_str);
run_server(PathBuf::from(dir), log_level).await?;
}
Some(("start", _sub_m)) => {
let working_dir = std::env::current_dir()?;
if let Some(info) = lockfile::get_daemon_status(&working_dir)? {
if info.is_running {
println!("Daemon already running (pid={}).", info.contents.pid);
return Ok(());
}
lockfile::cleanup_stale(&working_dir)?;
}
crate::daemon::daemon::spawn_server_daemon(&working_dir)?;
println!("Daemon started for {}.", working_dir.display());
}
Some(("stop", _sub_m)) => {
let working_dir = std::env::current_dir()?;
lockfile::stop_daemon(&working_dir)?;
println!("Daemon stopped.");
}
Some(("status", _sub_m)) => {
let working_dir = std::env::current_dir()?;
match lockfile::get_daemon_status(&working_dir)? {
Some(info) => {
let status = if info.is_running { "running" } else { "stale" };
println!(
"[{}] pid={} socket={} version={}",
status,
info.contents.pid,
info.contents.socket,
info.contents.version,
);
}
None => {
println!("No daemon for this directory.");
}
}
}
Some(("list", _sub_m)) => {
let daemons = lockfile::list_daemons()?;
if daemons.is_empty() {
println!("No daemons found.");
} else {
for d in &daemons {
let status = if d.is_running { "running" } else { "stale" };
println!(
"[{}] pid={} dir={} socket={} version={}",
status,
d.contents.pid,
d.contents.working_dir,
d.contents.socket,
d.contents.version,
);
}
}
}
Some(("clean", _sub_m)) => {
let count = lockfile::cleanup_all_stale()?;
println!("Removed {} stale lock file(s).", count);
}
_ => {
println!("Expected more arguments after `dk server`");
}
},
Some((arg, _sub_m)) => {
println!("Unknown: {}", arg);
}
None => {
let paths = matches
.get_many::<String>("files")
.map(|p| p.collect::<Vec<_>>())
.unwrap_or_default();
if let Some(first) = paths.first() {
if first.ends_with(".lua") {
let src = std::fs::read_to_string(first)?;
let lua = mlua::Lua::new();
let cancel = tokio_util::sync::CancellationToken::new();
lua.set_app_data(cancel.clone());
lua
.globals()
.set("std", init_std(&lua).map_err(|e| anyhow!("{}", e))?)
.map_err(|e| anyhow!("{}", e))?;
let chunk = lua.load(src.clone());
let f: mlua::Function = chunk.eval().map_err(|e| anyhow!("{}", e))?;
let r = f
.call_async::<mlua::Value>(())
.await
.map_err(|e| anyhow!("{}", e))?;
println!("-> {:?}", r);
cancel.cancel();
}
else if first.ends_with(".js") {
let src = std::fs::read_to_string(first)?;
let vm = JsVm::new().await?;
let root =
vm.eval_file(Path::new("dekit.js"), src.as_bytes()).await?;
let r: anyhow::Result<()> =
rquickjs::async_with!(vm.context => |ctx| {
run_module_main(&ctx, &root).await
})
.await;
r?;
}
} else {
let working_dir = std::env::current_dir()?;
let (sender, receiver) =
connect_client_socket::<CltToSrv, SrvToClt>(&working_dir, true)
.await?;
client_main(sender, receiver).await?;
}
}
}
Ok(())
}
async fn run_module_main(
ctx: &rquickjs::Ctx<'_>,
root: &rquickjs::Persistent<rquickjs::Object<'static>>,
) -> anyhow::Result<()> {
let m = map_js_error(
ctx,
root.clone().restore(ctx),
"Failed to restore module namespace",
)?;
let main = map_js_error(
ctx,
m.get::<_, rquickjs::Value>("main"),
"Failed to read exported `main`",
)?;
let val = match main.type_of() {
rquickjs::Type::Constructor => map_js_error(
ctx,
main
.into_constructor()
.expect("Type checked as constructor")
.call::<_, rquickjs::Value>(()),
"Error while calling exported constructor `main`",
)?,
rquickjs::Type::Function => map_js_error(
ctx,
main
.into_function()
.expect("Type checked as function")
.call(()),
"Error while calling exported function `main`",
)?,
t => anyhow::bail!("Exported `main` is not a function ({}).", t.as_str()),
};
let val = if let Some(promise) = val.clone().into_promise() {
map_js_error(
ctx,
promise.into_future::<rquickjs::Value<'_>>().await,
"Unhandled rejection in exported `main`",
)?
} else {
val
};
println!("-> {:?}", val);
Ok(())
}
fn map_js_error<T>(
ctx: &rquickjs::Ctx<'_>,
result: rquickjs::Result<T>,
scope: &str,
) -> anyhow::Result<T> {
result.catch(ctx).map_err(|err| anyhow!("{scope}:\n{err}"))
}