1use std::path::{Path, PathBuf};
2
3use anyhow::{anyhow, bail};
4use clap::{Arg, Command};
5use rquickjs::CatchResultExt;
6
7use crate::mprocs::app::ClientId;
8use crate::{
9 client::client_main,
10 console::create_console_task,
11 daemon::{
12 lockfile,
13 receiver::MsgReceiver,
14 sender::MsgSender,
15 socket::{bind_server_socket, connect_client_socket},
16 },
17 js::js_vm::JsVm,
18 kernel::{
19 kernel::Kernel,
20 kernel_message::{
21 KernelCommand, KernelQuery, KernelQueryResponse, TaskContext,
22 },
23 task::{TaskCmd, TaskStatus},
24 task_path::TaskPath,
25 },
26 lualib::init_std,
27 protocol::{CltToSrv, DkRequest, DkResponse, DkTaskInfo, SrvToClt},
28 term::Size,
29};
30
31async fn run_server(
32 working_dir: PathBuf,
33 log_level: Option<&str>,
34) -> anyhow::Result<()> {
35 let _logger = crate::logging::init(crate::logging::Config {
36 binary: "dk",
37 cli_level: log_level,
38 log_env: "DK_LOG",
39 file_env: "DK_LOG_FILE",
40 default_dir: Some(&working_dir),
41 })?;
42
43 let lock_guard = lockfile::create_lock_file(&working_dir)?;
45 log::info!("Lock file created for directory: {}", working_dir.display());
46
47 #[cfg(unix)]
48 crate::process::unix_processes_waiter::UnixProcessesWaiter::init()?;
49 let kernel = Kernel::new();
50 let pc = kernel.context();
51
52 let socket_path = lock_guard.socket_path().to_path_buf();
53 let (app_task_id, console_vt) = create_console_task(&pc);
54 let app_sender = pc.get_task_sender(app_task_id);
55
56 spawn_configured_procs(&pc, &working_dir);
57
58 tokio::spawn(async move {
59 let mut last_client_id = 0;
60
61 let mut server_socket = match bind_server_socket(&socket_path).await {
62 Ok(server_socket) => {
63 log::info!("Server is listening.");
64 #[cfg(unix)]
65 {
66 server_socket
67 }
68 #[cfg(windows)]
69 {
70 let (sock, _addr) = server_socket;
71 sock
72 }
73 }
74 Err(err) => {
75 log::error!("Failed to bind the server: {:?}", err);
76 pc.send(KernelCommand::Quit);
77 return;
78 }
79 };
80 log::debug!("Waiting for clients...");
81 loop {
82 match server_socket.accept().await {
83 Ok((sender, receiver)) => {
84 last_client_id += 1;
85 let client_id = ClientId(last_client_id);
86 let app_sender = app_sender.clone();
87 let pc = pc.clone();
88 let console_task_id = app_task_id;
89 let console_vt = console_vt.clone();
90 tokio::spawn(async move {
91 dispatch_connection(
92 client_id,
93 app_sender,
94 pc,
95 console_task_id,
96 console_vt,
97 sender,
98 receiver,
99 )
100 .await;
101 });
102 }
103 Err(err) => {
104 log::debug!("Server socket accept error: {}", err);
105 break;
106 }
107 }
108 }
109 });
110
111 kernel.run().await;
112
113 drop(lock_guard);
115
116 #[cfg(unix)]
117 crate::process::unix_processes_waiter::UnixProcessesWaiter::uninit()?;
118
119 Ok(())
120}
121
122fn print_task_list(resp: DkResponse) {
123 match resp {
124 DkResponse::TaskList(tasks) => {
125 if tasks.is_empty() {
126 println!("No tasks.");
127 } else {
128 for t in &tasks {
129 println!("{}\t{}", t.path, t.status);
130 }
131 }
132 }
133 DkResponse::Error(e) => eprintln!("Error: {}", e),
134 _ => eprintln!("Unexpected response"),
135 }
136}
137
138fn spawn_configured_procs(pc: &TaskContext, working_dir: &Path) {
139 let config = match crate::config::Config::load(working_dir) {
140 Ok(cfg) => cfg,
141 Err(err) => {
142 log::warn!("Failed to load config: {}", err);
143 return;
144 }
145 };
146 for proc in &config.procs {
147 let path = match TaskPath::new(&format!("/{}", proc.name)) {
148 Ok(p) => p,
149 Err(err) => {
150 log::warn!("Invalid proc name {:?}: {}", proc.name, err);
151 continue;
152 }
153 };
154 if proc.cmd.is_empty() {
155 log::warn!("Proc {:?} has empty cmd; skipping", proc.name);
156 continue;
157 }
158 let mut spec =
159 crate::process::process_spec::ProcessSpec::from_argv(proc.cmd.clone());
160 if let Some(cwd) = &proc.cwd {
161 spec.cwd(cwd);
162 } else {
163 spec.cwd(working_dir.to_string_lossy());
164 }
165 crate::task::proc_task::spawn_proc_task(pc, path, spec);
166 }
167}
168
169async fn dispatch_connection(
171 client_id: ClientId,
172 app_sender: crate::kernel::kernel_message::TaskSender,
173 pc: TaskContext,
174 console_task_id: crate::kernel::task::TaskId,
175 console_vt: crate::kernel::kernel_message::SharedVt,
176 mut sender: MsgSender<SrvToClt>,
177 mut receiver: MsgReceiver<CltToSrv>,
178) {
179 let first_msg = receiver.recv().await;
180 match first_msg {
181 Some(Ok(CltToSrv::Rpc(req))) => {
182 let resp = handle_rpc(&pc, req)
183 .await
184 .unwrap_or_else(|e| DkResponse::Error(e.to_string()));
185 let _ = sender.send(SrvToClt::Rpc(resp)).await;
186 }
187 Some(Ok(CltToSrv::Init { width, height })) => {
188 crate::console::spawn_client_task(
189 &pc,
190 console_task_id,
191 console_vt,
192 app_sender,
193 client_id,
194 Size { width, height },
195 sender,
196 receiver,
197 );
198 }
199 _ => {
200 log::warn!("Unexpected first message from client");
201 }
202 }
203}
204
205async fn handle_rpc(
206 pc: &TaskContext,
207 req: DkRequest,
208) -> anyhow::Result<DkResponse> {
209 let response = match req {
210 DkRequest::Ls { glob } => {
211 let query = KernelQuery::ListTasks(glob);
212 match pc.query(query).await? {
213 KernelQueryResponse::TaskList(tasks) => {
214 let items = tasks
215 .into_iter()
216 .map(|t| DkTaskInfo {
217 path: t
218 .path
219 .map(|p| p.to_string())
220 .unwrap_or_else(|| format!("<task:{}>", t.id.0)),
221 status: match t.status {
222 TaskStatus::Running => "running".to_string(),
223 TaskStatus::NotStarted => "not-started".to_string(),
224 TaskStatus::Exited(code) => format!("exited:{}", code),
225 },
226 })
227 .collect();
228 DkResponse::TaskList(items)
229 }
230 _ => DkResponse::Error("unexpected query response".to_string()),
231 }
232 }
233
234 DkRequest::Start { path } => {
235 let path = TaskPath::new(&path)?;
236 pc.send_to_path(path, TaskCmd::Start);
237 DkResponse::Ok
238 }
239
240 DkRequest::Stop { path } => {
241 let path = TaskPath::new(&path)?;
242 pc.send_to_path(path, TaskCmd::Stop);
243 DkResponse::Ok
244 }
245
246 DkRequest::Kill { path } => {
247 let path = TaskPath::new(&path)?;
248 pc.send_to_path(path, TaskCmd::Kill);
249 DkResponse::Ok
250 }
251
252 DkRequest::Restart { path } => {
253 let path = TaskPath::new(&path)?;
254 pc.send_to_path(path.clone(), TaskCmd::Stop);
255 pc.send_to_path(path, TaskCmd::Start);
256 DkResponse::Ok
257 }
258
259 DkRequest::Screen { path } => {
260 let path = TaskPath::new(&path)?;
261 let query = KernelQuery::GetScreen(path);
262 match pc.query(query).await? {
263 KernelQueryResponse::Screen(content) => DkResponse::Screen(content),
264 _ => DkResponse::Error("unexpected query response".to_string()),
265 }
266 }
267
268 DkRequest::Spawn { path, cmd, cwd } => {
269 let task_path = TaskPath::new(&path)?;
270 if cmd.is_empty() {
271 bail!("cmd must not be empty".to_string());
272 }
273 let mut spec = crate::process::process_spec::ProcessSpec::from_argv(cmd);
274 if let Some(cwd) = cwd {
275 spec.cwd(cwd);
276 } else if let Ok(cwd) = std::env::current_dir() {
277 spec.cwd(cwd.to_string_lossy());
278 }
279 crate::task::proc_task::spawn_proc_task(pc, task_path, spec);
280 DkResponse::Ok
281 }
282 };
283 Ok(response)
284}
285
286async fn rpc_request(
287 working_dir: &Path,
288 req: DkRequest,
289 spawn_server: bool,
290) -> anyhow::Result<DkResponse> {
291 let (mut sender, mut receiver) =
292 connect_client_socket::<CltToSrv, SrvToClt>(working_dir, spawn_server)
293 .await?;
294 sender.send(CltToSrv::Rpc(req)).await?;
295 match receiver.recv().await {
296 Some(Ok(SrvToClt::Rpc(resp))) => Ok(resp),
297 Some(Ok(other)) => {
298 anyhow::bail!("unexpected response: {:?}", other)
299 }
300 Some(Err(e)) => anyhow::bail!("decode error: {}", e),
301 None => anyhow::bail!("connection closed without response"),
302 }
303}
304
305pub async fn dekit_main() -> anyhow::Result<()> {
306 println!("* Welcome to dekit — playground for future features *\n");
307
308 let cmd = clap::command!()
309 .subcommands([
310 Command::new("attach"),
311 Command::new("up"),
312 Command::new("down"),
313 Command::new("spawn")
314 .about("Create and start a new task at the given path")
315 .arg(
316 Arg::new("path")
317 .long("path")
318 .required(true)
319 .help("Task path (e.g. /services/web)"),
320 )
321 .arg(
322 Arg::new("cwd")
323 .long("cwd")
324 .help("Working directory for the process"),
325 )
326 .arg(
327 Arg::new("cmd")
328 .required(true)
329 .num_args(1..)
330 .last(true)
331 .help("Command to run"),
332 ),
333 Command::new("ls")
334 .about("List tasks")
335 .arg(Arg::new("glob").help("Optional glob pattern")),
336 Command::new("start")
337 .about("Start a stopped task")
338 .arg(Arg::new("path").required(true).help("Task path")),
339 Command::new("stop")
340 .about("Gracefully stop a task")
341 .arg(Arg::new("path").required(true).help("Task path")),
342 Command::new("kill")
343 .about("Force kill a task")
344 .arg(Arg::new("path").required(true).help("Task path")),
345 Command::new("restart")
346 .about("Restart a task")
347 .arg(Arg::new("path").required(true).help("Task path")),
348 Command::new("screen")
349 .about("Print the current screen of a task")
350 .arg(Arg::new("path").required(true).help("Task path")),
351 Command::new("server").subcommands([
352 Command::new("run")
353 .arg(
354 Arg::new("dir")
355 .long("dir")
356 .required(true)
357 .help("Working directory this daemon manages"),
358 )
359 .arg(
360 Arg::new("log-level")
361 .long("log-level")
362 .help("Diagnostic log level (off|error|warn|info|debug|trace, or env_logger spec). Falls back to $DK_LOG, $RUST_LOG, then 'error' (release) or 'trace' (debug)."),
363 ),
364 Command::new("start")
365 .about("Start the daemon for the current directory"),
366 Command::new("stop").about("Stop the daemon for the current directory"),
367 Command::new("status")
368 .about("Show daemon status for the current directory"),
369 Command::new("list").about("List all daemons on this machine"),
370 Command::new("clean").about("Remove stale lock files"),
371 ]),
372 ])
373 .arg(
374 Arg::new("files")
375 .action(clap::ArgAction::Append)
376 .trailing_var_arg(true),
377 );
378 let matches = cmd.get_matches();
379
380 match matches.subcommand() {
381 Some(("attach", _sub_m)) => {
382 let working_dir = std::env::current_dir()?;
383 let (sender, receiver) =
384 connect_client_socket::<CltToSrv, SrvToClt>(&working_dir, false)
385 .await?;
386 client_main(sender, receiver).await?;
387 }
388 Some(("spawn", sub_m)) => {
389 let working_dir = std::env::current_dir()?;
390 let path = sub_m.get_one::<String>("path").unwrap().clone();
391 let cwd = sub_m.get_one::<String>("cwd").cloned();
392 let cmd: Vec<String> =
393 sub_m.get_many::<String>("cmd").unwrap().cloned().collect();
394 let resp =
395 rpc_request(&working_dir, DkRequest::Spawn { path, cmd, cwd }, true)
396 .await?;
397 match resp {
398 DkResponse::Ok => println!("Spawned."),
399 DkResponse::Error(e) => eprintln!("Error: {}", e),
400 _ => eprintln!("Unexpected response"),
401 }
402 }
403 Some(("ls", sub_m)) => {
404 let working_dir = std::env::current_dir()?;
405 let glob = sub_m.get_one::<String>("glob").cloned();
406 let resp =
407 rpc_request(&working_dir, DkRequest::Ls { glob }, false).await?;
408 print_task_list(resp);
409 }
410 Some(("start", sub_m)) => {
411 let working_dir = std::env::current_dir()?;
412 let path = sub_m.get_one::<String>("path").unwrap().clone();
413 let resp =
414 rpc_request(&working_dir, DkRequest::Start { path }, true).await?;
415 match resp {
416 DkResponse::Ok => println!("Started."),
417 DkResponse::Error(e) => eprintln!("Error: {}", e),
418 _ => eprintln!("Unexpected response"),
419 }
420 }
421 Some(("stop", sub_m)) => {
422 let working_dir = std::env::current_dir()?;
423 let path = sub_m.get_one::<String>("path").unwrap().clone();
424 let resp =
425 rpc_request(&working_dir, DkRequest::Stop { path }, false).await?;
426 match resp {
427 DkResponse::Ok => println!("Stopped."),
428 DkResponse::Error(e) => eprintln!("Error: {}", e),
429 _ => eprintln!("Unexpected response"),
430 }
431 }
432 Some(("kill", sub_m)) => {
433 let working_dir = std::env::current_dir()?;
434 let path = sub_m.get_one::<String>("path").unwrap().clone();
435 let resp =
436 rpc_request(&working_dir, DkRequest::Kill { path }, false).await?;
437 match resp {
438 DkResponse::Ok => println!("Killed."),
439 DkResponse::Error(e) => eprintln!("Error: {}", e),
440 _ => eprintln!("Unexpected response"),
441 }
442 }
443 Some(("restart", sub_m)) => {
444 let working_dir = std::env::current_dir()?;
445 let path = sub_m.get_one::<String>("path").unwrap().clone();
446 let resp =
447 rpc_request(&working_dir, DkRequest::Restart { path }, true).await?;
448 match resp {
449 DkResponse::Ok => println!("Restarted."),
450 DkResponse::Error(e) => eprintln!("Error: {}", e),
451 _ => eprintln!("Unexpected response"),
452 }
453 }
454 Some(("screen", sub_m)) => {
455 let working_dir = std::env::current_dir()?;
456 let path = sub_m.get_one::<String>("path").unwrap().clone();
457 let resp =
458 rpc_request(&working_dir, DkRequest::Screen { path }, false).await?;
459 match resp {
460 DkResponse::Screen(Some(content)) => {
461 print!("{}", content);
462 print!("\x1b[0m\n");
464 }
465 DkResponse::Screen(None) => {
466 eprintln!("No screen content for this task.");
467 }
468 DkResponse::Error(e) => eprintln!("Error: {}", e),
469 _ => eprintln!("Unexpected response"),
470 }
471 }
472 Some(("up", _sub_m)) => {
473 let working_dir = std::env::current_dir()?;
474 let resp =
475 rpc_request(&working_dir, DkRequest::Ls { glob: None }, true).await?;
476 print_task_list(resp);
477 }
478 Some(("down", _sub_m)) => {
479 let working_dir = std::env::current_dir()?;
480 lockfile::stop_daemon(&working_dir)?;
481 println!("Daemon stopped.");
482 }
483 Some(("server", sub_m)) => match sub_m.subcommand() {
484 Some(("run", run_m)) => {
485 let dir = run_m.get_one::<String>("dir").unwrap();
486 let log_level =
487 run_m.get_one::<String>("log-level").map(String::as_str);
488 run_server(PathBuf::from(dir), log_level).await?;
489 }
490 Some(("start", _sub_m)) => {
491 let working_dir = std::env::current_dir()?;
492 if let Some(info) = lockfile::get_daemon_status(&working_dir)? {
494 if info.is_running {
495 println!("Daemon already running (pid={}).", info.contents.pid);
496 return Ok(());
497 }
498 lockfile::cleanup_stale(&working_dir)?;
500 }
501 crate::daemon::daemon::spawn_server_daemon(&working_dir)?;
502 println!("Daemon started for {}.", working_dir.display());
503 }
504 Some(("stop", _sub_m)) => {
505 let working_dir = std::env::current_dir()?;
506 lockfile::stop_daemon(&working_dir)?;
507 println!("Daemon stopped.");
508 }
509 Some(("status", _sub_m)) => {
510 let working_dir = std::env::current_dir()?;
511 match lockfile::get_daemon_status(&working_dir)? {
512 Some(info) => {
513 let status = if info.is_running { "running" } else { "stale" };
514 println!(
515 "[{}] pid={} socket={} version={}",
516 status,
517 info.contents.pid,
518 info.contents.socket,
519 info.contents.version,
520 );
521 }
522 None => {
523 println!("No daemon for this directory.");
524 }
525 }
526 }
527 Some(("list", _sub_m)) => {
528 let daemons = lockfile::list_daemons()?;
529 if daemons.is_empty() {
530 println!("No daemons found.");
531 } else {
532 for d in &daemons {
533 let status = if d.is_running { "running" } else { "stale" };
534 println!(
535 "[{}] pid={} dir={} socket={} version={}",
536 status,
537 d.contents.pid,
538 d.contents.working_dir,
539 d.contents.socket,
540 d.contents.version,
541 );
542 }
543 }
544 }
545 Some(("clean", _sub_m)) => {
546 let count = lockfile::cleanup_all_stale()?;
547 println!("Removed {} stale lock file(s).", count);
548 }
549 _ => {
550 println!("Expected more arguments after `dk server`");
551 }
552 },
553 Some((arg, _sub_m)) => {
554 println!("Unknown: {}", arg);
555 }
556 None => {
557 let paths = matches
558 .get_many::<String>("files")
559 .map(|p| p.collect::<Vec<_>>())
560 .unwrap_or_default();
561
562 if let Some(first) = paths.first() {
563 if first.ends_with(".lua") {
565 let src = std::fs::read_to_string(first)?;
566
567 let lua = mlua::Lua::new();
568 let cancel = tokio_util::sync::CancellationToken::new();
569 lua.set_app_data(cancel.clone());
570 lua
571 .globals()
572 .set("std", init_std(&lua).map_err(|e| anyhow!("{}", e))?)
573 .map_err(|e| anyhow!("{}", e))?;
574
575 let chunk = lua.load(src.clone());
576 let f: mlua::Function = chunk.eval().map_err(|e| anyhow!("{}", e))?;
577 let r = f
578 .call_async::<mlua::Value>(())
579 .await
580 .map_err(|e| anyhow!("{}", e))?;
581 println!("-> {:?}", r);
582 cancel.cancel();
583 }
584 else if first.ends_with(".js") {
586 let src = std::fs::read_to_string(first)?;
587
588 let vm = JsVm::new().await?;
589 let root =
590 vm.eval_file(Path::new("dekit.js"), src.as_bytes()).await?;
591
592 let r: anyhow::Result<()> =
593 rquickjs::async_with!(vm.context => |ctx| {
594 run_module_main(&ctx, &root).await
595 })
596 .await;
597 r?;
598 }
599 } else {
600 let working_dir = std::env::current_dir()?;
603 let (sender, receiver) =
604 connect_client_socket::<CltToSrv, SrvToClt>(&working_dir, true)
605 .await?;
606 client_main(sender, receiver).await?;
607 }
608 }
609 }
610
611 Ok(())
612}
613
614async fn run_module_main(
615 ctx: &rquickjs::Ctx<'_>,
616 root: &rquickjs::Persistent<rquickjs::Object<'static>>,
617) -> anyhow::Result<()> {
618 let m = map_js_error(
619 ctx,
620 root.clone().restore(ctx),
621 "Failed to restore module namespace",
622 )?;
623 let main = map_js_error(
624 ctx,
625 m.get::<_, rquickjs::Value>("main"),
626 "Failed to read exported `main`",
627 )?;
628
629 let val = match main.type_of() {
630 rquickjs::Type::Constructor => map_js_error(
631 ctx,
632 main
633 .into_constructor()
634 .expect("Type checked as constructor")
635 .call::<_, rquickjs::Value>(()),
636 "Error while calling exported constructor `main`",
637 )?,
638 rquickjs::Type::Function => map_js_error(
639 ctx,
640 main
641 .into_function()
642 .expect("Type checked as function")
643 .call(()),
644 "Error while calling exported function `main`",
645 )?,
646 t => anyhow::bail!("Exported `main` is not a function ({}).", t.as_str()),
647 };
648
649 let val = if let Some(promise) = val.clone().into_promise() {
650 map_js_error(
651 ctx,
652 promise.into_future::<rquickjs::Value<'_>>().await,
653 "Unhandled rejection in exported `main`",
654 )?
655 } else {
656 val
657 };
658
659 println!("-> {:?}", val);
660 Ok(())
661}
662
663fn map_js_error<T>(
664 ctx: &rquickjs::Ctx<'_>,
665 result: rquickjs::Result<T>,
666 scope: &str,
667) -> anyhow::Result<T> {
668 result.catch(ctx).map_err(|err| anyhow!("{scope}:\n{err}"))
669}