Skip to main content

lib/
dekit.rs

1use std::path::{Path, PathBuf};
2
3use anyhow::{anyhow, bail};
4use clap::{Arg, Command};
5use rquickjs::CatchResultExt;
6
7use crate::mprocs::app::ClientId;
8use crate::{
9  client::client_main,
10  console::create_console_task,
11  daemon::{
12    lockfile,
13    receiver::MsgReceiver,
14    sender::MsgSender,
15    socket::{bind_server_socket, connect_client_socket},
16  },
17  js::js_vm::JsVm,
18  kernel::{
19    kernel::Kernel,
20    kernel_message::{
21      KernelCommand, KernelQuery, KernelQueryResponse, TaskContext,
22    },
23    task::{TaskCmd, TaskStatus},
24    task_path::TaskPath,
25  },
26  lualib::init_std,
27  protocol::{CltToSrv, DkRequest, DkResponse, DkTaskInfo, SrvToClt},
28  term::Size,
29};
30
31async fn run_server(
32  working_dir: PathBuf,
33  log_level: Option<&str>,
34) -> anyhow::Result<()> {
35  let _logger = crate::logging::init(crate::logging::Config {
36    binary: "dk",
37    cli_level: log_level,
38    log_env: "DK_LOG",
39    file_env: "DK_LOG_FILE",
40    default_dir: Some(&working_dir),
41  })?;
42
43  // Create lock file and acquire exclusive flock.
44  let lock_guard = lockfile::create_lock_file(&working_dir)?;
45  log::info!("Lock file created for directory: {}", working_dir.display());
46
47  #[cfg(unix)]
48  crate::process::unix_processes_waiter::UnixProcessesWaiter::init()?;
49  let kernel = Kernel::new();
50  let pc = kernel.context();
51
52  let socket_path = lock_guard.socket_path().to_path_buf();
53  let (app_task_id, console_vt) = create_console_task(&pc);
54  let app_sender = pc.get_task_sender(app_task_id);
55
56  spawn_configured_procs(&pc, &working_dir);
57
58  tokio::spawn(async move {
59    let mut last_client_id = 0;
60
61    let mut server_socket = match bind_server_socket(&socket_path).await {
62      Ok(server_socket) => {
63        log::info!("Server is listening.");
64        #[cfg(unix)]
65        {
66          server_socket
67        }
68        #[cfg(windows)]
69        {
70          let (sock, _addr) = server_socket;
71          sock
72        }
73      }
74      Err(err) => {
75        log::error!("Failed to bind the server: {:?}", err);
76        pc.send(KernelCommand::Quit);
77        return;
78      }
79    };
80    log::debug!("Waiting for clients...");
81    loop {
82      match server_socket.accept().await {
83        Ok((sender, receiver)) => {
84          last_client_id += 1;
85          let client_id = ClientId(last_client_id);
86          let app_sender = app_sender.clone();
87          let pc = pc.clone();
88          let console_task_id = app_task_id;
89          let console_vt = console_vt.clone();
90          tokio::spawn(async move {
91            dispatch_connection(
92              client_id,
93              app_sender,
94              pc,
95              console_task_id,
96              console_vt,
97              sender,
98              receiver,
99            )
100            .await;
101          });
102        }
103        Err(err) => {
104          log::debug!("Server socket accept error: {}", err);
105          break;
106        }
107      }
108    }
109  });
110
111  kernel.run().await;
112
113  // lock_guard is dropped here, removing lock + socket files.
114  drop(lock_guard);
115
116  #[cfg(unix)]
117  crate::process::unix_processes_waiter::UnixProcessesWaiter::uninit()?;
118
119  Ok(())
120}
121
122fn print_task_list(resp: DkResponse) {
123  match resp {
124    DkResponse::TaskList(tasks) => {
125      if tasks.is_empty() {
126        println!("No tasks.");
127      } else {
128        for t in &tasks {
129          println!("{}\t{}", t.path, t.status);
130        }
131      }
132    }
133    DkResponse::Error(e) => eprintln!("Error: {}", e),
134    _ => eprintln!("Unexpected response"),
135  }
136}
137
138fn spawn_configured_procs(pc: &TaskContext, working_dir: &Path) {
139  let config = match crate::config::Config::load(working_dir) {
140    Ok(cfg) => cfg,
141    Err(err) => {
142      log::warn!("Failed to load config: {}", err);
143      return;
144    }
145  };
146  for proc in &config.procs {
147    let path = match TaskPath::new(&format!("/{}", proc.name)) {
148      Ok(p) => p,
149      Err(err) => {
150        log::warn!("Invalid proc name {:?}: {}", proc.name, err);
151        continue;
152      }
153    };
154    if proc.cmd.is_empty() {
155      log::warn!("Proc {:?} has empty cmd; skipping", proc.name);
156      continue;
157    }
158    let mut spec =
159      crate::process::process_spec::ProcessSpec::from_argv(proc.cmd.clone());
160    if let Some(cwd) = &proc.cwd {
161      spec.cwd(cwd);
162    } else {
163      spec.cwd(working_dir.to_string_lossy());
164    }
165    crate::task::proc_task::spawn_proc_task(pc, path, spec);
166  }
167}
168
169/// Dispatch an accepted connection: RPC or TUI.
170async fn dispatch_connection(
171  client_id: ClientId,
172  app_sender: crate::kernel::kernel_message::TaskSender,
173  pc: TaskContext,
174  console_task_id: crate::kernel::task::TaskId,
175  console_vt: crate::kernel::kernel_message::SharedVt,
176  mut sender: MsgSender<SrvToClt>,
177  mut receiver: MsgReceiver<CltToSrv>,
178) {
179  let first_msg = receiver.recv().await;
180  match first_msg {
181    Some(Ok(CltToSrv::Rpc(req))) => {
182      let resp = handle_rpc(&pc, req)
183        .await
184        .unwrap_or_else(|e| DkResponse::Error(e.to_string()));
185      let _ = sender.send(SrvToClt::Rpc(resp)).await;
186    }
187    Some(Ok(CltToSrv::Init { width, height })) => {
188      crate::console::spawn_client_task(
189        &pc,
190        console_task_id,
191        console_vt,
192        app_sender,
193        client_id,
194        Size { width, height },
195        sender,
196        receiver,
197      );
198    }
199    _ => {
200      log::warn!("Unexpected first message from client");
201    }
202  }
203}
204
205async fn handle_rpc(
206  pc: &TaskContext,
207  req: DkRequest,
208) -> anyhow::Result<DkResponse> {
209  let response = match req {
210    DkRequest::Ls { glob } => {
211      let query = KernelQuery::ListTasks(glob);
212      match pc.query(query).await? {
213        KernelQueryResponse::TaskList(tasks) => {
214          let items = tasks
215            .into_iter()
216            .map(|t| DkTaskInfo {
217              path: t
218                .path
219                .map(|p| p.to_string())
220                .unwrap_or_else(|| format!("<task:{}>", t.id.0)),
221              status: match t.status {
222                TaskStatus::Running => "running".to_string(),
223                TaskStatus::NotStarted => "not-started".to_string(),
224                TaskStatus::Exited(code) => format!("exited:{}", code),
225              },
226            })
227            .collect();
228          DkResponse::TaskList(items)
229        }
230        _ => DkResponse::Error("unexpected query response".to_string()),
231      }
232    }
233
234    DkRequest::Start { path } => {
235      let path = TaskPath::new(&path)?;
236      pc.send_to_path(path, TaskCmd::Start);
237      DkResponse::Ok
238    }
239
240    DkRequest::Stop { path } => {
241      let path = TaskPath::new(&path)?;
242      pc.send_to_path(path, TaskCmd::Stop);
243      DkResponse::Ok
244    }
245
246    DkRequest::Kill { path } => {
247      let path = TaskPath::new(&path)?;
248      pc.send_to_path(path, TaskCmd::Kill);
249      DkResponse::Ok
250    }
251
252    DkRequest::Restart { path } => {
253      let path = TaskPath::new(&path)?;
254      pc.send_to_path(path.clone(), TaskCmd::Stop);
255      pc.send_to_path(path, TaskCmd::Start);
256      DkResponse::Ok
257    }
258
259    DkRequest::Screen { path } => {
260      let path = TaskPath::new(&path)?;
261      let query = KernelQuery::GetScreen(path);
262      match pc.query(query).await? {
263        KernelQueryResponse::Screen(content) => DkResponse::Screen(content),
264        _ => DkResponse::Error("unexpected query response".to_string()),
265      }
266    }
267
268    DkRequest::Spawn { path, cmd, cwd } => {
269      let task_path = TaskPath::new(&path)?;
270      if cmd.is_empty() {
271        bail!("cmd must not be empty".to_string());
272      }
273      let mut spec = crate::process::process_spec::ProcessSpec::from_argv(cmd);
274      if let Some(cwd) = cwd {
275        spec.cwd(cwd);
276      } else if let Ok(cwd) = std::env::current_dir() {
277        spec.cwd(cwd.to_string_lossy());
278      }
279      crate::task::proc_task::spawn_proc_task(pc, task_path, spec);
280      DkResponse::Ok
281    }
282  };
283  Ok(response)
284}
285
286async fn rpc_request(
287  working_dir: &Path,
288  req: DkRequest,
289  spawn_server: bool,
290) -> anyhow::Result<DkResponse> {
291  let (mut sender, mut receiver) =
292    connect_client_socket::<CltToSrv, SrvToClt>(working_dir, spawn_server)
293      .await?;
294  sender.send(CltToSrv::Rpc(req)).await?;
295  match receiver.recv().await {
296    Some(Ok(SrvToClt::Rpc(resp))) => Ok(resp),
297    Some(Ok(other)) => {
298      anyhow::bail!("unexpected response: {:?}", other)
299    }
300    Some(Err(e)) => anyhow::bail!("decode error: {}", e),
301    None => anyhow::bail!("connection closed without response"),
302  }
303}
304
305pub async fn dekit_main() -> anyhow::Result<()> {
306  println!("* Welcome to dekit — playground for future features *\n");
307
308  let cmd = clap::command!()
309    .subcommands([
310      Command::new("attach"),
311      Command::new("up"),
312      Command::new("down"),
313      Command::new("spawn")
314        .about("Create and start a new task at the given path")
315        .arg(
316          Arg::new("path")
317            .long("path")
318            .required(true)
319            .help("Task path (e.g. /services/web)"),
320        )
321        .arg(
322          Arg::new("cwd")
323            .long("cwd")
324            .help("Working directory for the process"),
325        )
326        .arg(
327          Arg::new("cmd")
328            .required(true)
329            .num_args(1..)
330            .last(true)
331            .help("Command to run"),
332        ),
333      Command::new("ls")
334        .about("List tasks")
335        .arg(Arg::new("glob").help("Optional glob pattern")),
336      Command::new("start")
337        .about("Start a stopped task")
338        .arg(Arg::new("path").required(true).help("Task path")),
339      Command::new("stop")
340        .about("Gracefully stop a task")
341        .arg(Arg::new("path").required(true).help("Task path")),
342      Command::new("kill")
343        .about("Force kill a task")
344        .arg(Arg::new("path").required(true).help("Task path")),
345      Command::new("restart")
346        .about("Restart a task")
347        .arg(Arg::new("path").required(true).help("Task path")),
348      Command::new("screen")
349        .about("Print the current screen of a task")
350        .arg(Arg::new("path").required(true).help("Task path")),
351      Command::new("server").subcommands([
352        Command::new("run")
353          .arg(
354            Arg::new("dir")
355              .long("dir")
356              .required(true)
357              .help("Working directory this daemon manages"),
358          )
359          .arg(
360            Arg::new("log-level")
361              .long("log-level")
362              .help("Diagnostic log level (off|error|warn|info|debug|trace, or env_logger spec). Falls back to $DK_LOG, $RUST_LOG, then 'error' (release) or 'trace' (debug)."),
363          ),
364        Command::new("start")
365          .about("Start the daemon for the current directory"),
366        Command::new("stop").about("Stop the daemon for the current directory"),
367        Command::new("status")
368          .about("Show daemon status for the current directory"),
369        Command::new("list").about("List all daemons on this machine"),
370        Command::new("clean").about("Remove stale lock files"),
371      ]),
372    ])
373    .arg(
374      Arg::new("files")
375        .action(clap::ArgAction::Append)
376        .trailing_var_arg(true),
377    );
378  let matches = cmd.get_matches();
379
380  match matches.subcommand() {
381    Some(("attach", _sub_m)) => {
382      let working_dir = std::env::current_dir()?;
383      let (sender, receiver) =
384        connect_client_socket::<CltToSrv, SrvToClt>(&working_dir, false)
385          .await?;
386      client_main(sender, receiver).await?;
387    }
388    Some(("spawn", sub_m)) => {
389      let working_dir = std::env::current_dir()?;
390      let path = sub_m.get_one::<String>("path").unwrap().clone();
391      let cwd = sub_m.get_one::<String>("cwd").cloned();
392      let cmd: Vec<String> =
393        sub_m.get_many::<String>("cmd").unwrap().cloned().collect();
394      let resp =
395        rpc_request(&working_dir, DkRequest::Spawn { path, cmd, cwd }, true)
396          .await?;
397      match resp {
398        DkResponse::Ok => println!("Spawned."),
399        DkResponse::Error(e) => eprintln!("Error: {}", e),
400        _ => eprintln!("Unexpected response"),
401      }
402    }
403    Some(("ls", sub_m)) => {
404      let working_dir = std::env::current_dir()?;
405      let glob = sub_m.get_one::<String>("glob").cloned();
406      let resp =
407        rpc_request(&working_dir, DkRequest::Ls { glob }, false).await?;
408      print_task_list(resp);
409    }
410    Some(("start", sub_m)) => {
411      let working_dir = std::env::current_dir()?;
412      let path = sub_m.get_one::<String>("path").unwrap().clone();
413      let resp =
414        rpc_request(&working_dir, DkRequest::Start { path }, true).await?;
415      match resp {
416        DkResponse::Ok => println!("Started."),
417        DkResponse::Error(e) => eprintln!("Error: {}", e),
418        _ => eprintln!("Unexpected response"),
419      }
420    }
421    Some(("stop", sub_m)) => {
422      let working_dir = std::env::current_dir()?;
423      let path = sub_m.get_one::<String>("path").unwrap().clone();
424      let resp =
425        rpc_request(&working_dir, DkRequest::Stop { path }, false).await?;
426      match resp {
427        DkResponse::Ok => println!("Stopped."),
428        DkResponse::Error(e) => eprintln!("Error: {}", e),
429        _ => eprintln!("Unexpected response"),
430      }
431    }
432    Some(("kill", sub_m)) => {
433      let working_dir = std::env::current_dir()?;
434      let path = sub_m.get_one::<String>("path").unwrap().clone();
435      let resp =
436        rpc_request(&working_dir, DkRequest::Kill { path }, false).await?;
437      match resp {
438        DkResponse::Ok => println!("Killed."),
439        DkResponse::Error(e) => eprintln!("Error: {}", e),
440        _ => eprintln!("Unexpected response"),
441      }
442    }
443    Some(("restart", sub_m)) => {
444      let working_dir = std::env::current_dir()?;
445      let path = sub_m.get_one::<String>("path").unwrap().clone();
446      let resp =
447        rpc_request(&working_dir, DkRequest::Restart { path }, true).await?;
448      match resp {
449        DkResponse::Ok => println!("Restarted."),
450        DkResponse::Error(e) => eprintln!("Error: {}", e),
451        _ => eprintln!("Unexpected response"),
452      }
453    }
454    Some(("screen", sub_m)) => {
455      let working_dir = std::env::current_dir()?;
456      let path = sub_m.get_one::<String>("path").unwrap().clone();
457      let resp =
458        rpc_request(&working_dir, DkRequest::Screen { path }, false).await?;
459      match resp {
460        DkResponse::Screen(Some(content)) => {
461          print!("{}", content);
462          // Reset terminal attributes after printing
463          print!("\x1b[0m\n");
464        }
465        DkResponse::Screen(None) => {
466          eprintln!("No screen content for this task.");
467        }
468        DkResponse::Error(e) => eprintln!("Error: {}", e),
469        _ => eprintln!("Unexpected response"),
470      }
471    }
472    Some(("up", _sub_m)) => {
473      let working_dir = std::env::current_dir()?;
474      let resp =
475        rpc_request(&working_dir, DkRequest::Ls { glob: None }, true).await?;
476      print_task_list(resp);
477    }
478    Some(("down", _sub_m)) => {
479      let working_dir = std::env::current_dir()?;
480      lockfile::stop_daemon(&working_dir)?;
481      println!("Daemon stopped.");
482    }
483    Some(("server", sub_m)) => match sub_m.subcommand() {
484      Some(("run", run_m)) => {
485        let dir = run_m.get_one::<String>("dir").unwrap();
486        let log_level =
487          run_m.get_one::<String>("log-level").map(String::as_str);
488        run_server(PathBuf::from(dir), log_level).await?;
489      }
490      Some(("start", _sub_m)) => {
491        let working_dir = std::env::current_dir()?;
492        // Check if already running.
493        if let Some(info) = lockfile::get_daemon_status(&working_dir)? {
494          if info.is_running {
495            println!("Daemon already running (pid={}).", info.contents.pid);
496            return Ok(());
497          }
498          // Stale -- clean up and start fresh.
499          lockfile::cleanup_stale(&working_dir)?;
500        }
501        crate::daemon::daemon::spawn_server_daemon(&working_dir)?;
502        println!("Daemon started for {}.", working_dir.display());
503      }
504      Some(("stop", _sub_m)) => {
505        let working_dir = std::env::current_dir()?;
506        lockfile::stop_daemon(&working_dir)?;
507        println!("Daemon stopped.");
508      }
509      Some(("status", _sub_m)) => {
510        let working_dir = std::env::current_dir()?;
511        match lockfile::get_daemon_status(&working_dir)? {
512          Some(info) => {
513            let status = if info.is_running { "running" } else { "stale" };
514            println!(
515              "[{}] pid={} socket={} version={}",
516              status,
517              info.contents.pid,
518              info.contents.socket,
519              info.contents.version,
520            );
521          }
522          None => {
523            println!("No daemon for this directory.");
524          }
525        }
526      }
527      Some(("list", _sub_m)) => {
528        let daemons = lockfile::list_daemons()?;
529        if daemons.is_empty() {
530          println!("No daemons found.");
531        } else {
532          for d in &daemons {
533            let status = if d.is_running { "running" } else { "stale" };
534            println!(
535              "[{}] pid={} dir={} socket={} version={}",
536              status,
537              d.contents.pid,
538              d.contents.working_dir,
539              d.contents.socket,
540              d.contents.version,
541            );
542          }
543        }
544      }
545      Some(("clean", _sub_m)) => {
546        let count = lockfile::cleanup_all_stale()?;
547        println!("Removed {} stale lock file(s).", count);
548      }
549      _ => {
550        println!("Expected more arguments after `dk server`");
551      }
552    },
553    Some((arg, _sub_m)) => {
554      println!("Unknown: {}", arg);
555    }
556    None => {
557      let paths = matches
558        .get_many::<String>("files")
559        .map(|p| p.collect::<Vec<_>>())
560        .unwrap_or_default();
561
562      if let Some(first) = paths.first() {
563        // .lua
564        if first.ends_with(".lua") {
565          let src = std::fs::read_to_string(first)?;
566
567          let lua = mlua::Lua::new();
568          let cancel = tokio_util::sync::CancellationToken::new();
569          lua.set_app_data(cancel.clone());
570          lua
571            .globals()
572            .set("std", init_std(&lua).map_err(|e| anyhow!("{}", e))?)
573            .map_err(|e| anyhow!("{}", e))?;
574
575          let chunk = lua.load(src.clone());
576          let f: mlua::Function = chunk.eval().map_err(|e| anyhow!("{}", e))?;
577          let r = f
578            .call_async::<mlua::Value>(())
579            .await
580            .map_err(|e| anyhow!("{}", e))?;
581          println!("-> {:?}", r);
582          cancel.cancel();
583        }
584        // .js
585        else if first.ends_with(".js") {
586          let src = std::fs::read_to_string(first)?;
587
588          let vm = JsVm::new().await?;
589          let root =
590            vm.eval_file(Path::new("dekit.js"), src.as_bytes()).await?;
591
592          let r: anyhow::Result<()> =
593            rquickjs::async_with!(vm.context => |ctx| {
594              run_module_main(&ctx, &root).await
595            })
596            .await;
597          r?;
598        }
599      } else {
600        // No args: connect to daemon for current dir, starting it on
601        // demand.
602        let working_dir = std::env::current_dir()?;
603        let (sender, receiver) =
604          connect_client_socket::<CltToSrv, SrvToClt>(&working_dir, true)
605            .await?;
606        client_main(sender, receiver).await?;
607      }
608    }
609  }
610
611  Ok(())
612}
613
614async fn run_module_main(
615  ctx: &rquickjs::Ctx<'_>,
616  root: &rquickjs::Persistent<rquickjs::Object<'static>>,
617) -> anyhow::Result<()> {
618  let m = map_js_error(
619    ctx,
620    root.clone().restore(ctx),
621    "Failed to restore module namespace",
622  )?;
623  let main = map_js_error(
624    ctx,
625    m.get::<_, rquickjs::Value>("main"),
626    "Failed to read exported `main`",
627  )?;
628
629  let val = match main.type_of() {
630    rquickjs::Type::Constructor => map_js_error(
631      ctx,
632      main
633        .into_constructor()
634        .expect("Type checked as constructor")
635        .call::<_, rquickjs::Value>(()),
636      "Error while calling exported constructor `main`",
637    )?,
638    rquickjs::Type::Function => map_js_error(
639      ctx,
640      main
641        .into_function()
642        .expect("Type checked as function")
643        .call(()),
644      "Error while calling exported function `main`",
645    )?,
646    t => anyhow::bail!("Exported `main` is not a function ({}).", t.as_str()),
647  };
648
649  let val = if let Some(promise) = val.clone().into_promise() {
650    map_js_error(
651      ctx,
652      promise.into_future::<rquickjs::Value<'_>>().await,
653      "Unhandled rejection in exported `main`",
654    )?
655  } else {
656    val
657  };
658
659  println!("-> {:?}", val);
660  Ok(())
661}
662
663fn map_js_error<T>(
664  ctx: &rquickjs::Ctx<'_>,
665  result: rquickjs::Result<T>,
666  scope: &str,
667) -> anyhow::Result<T> {
668  result.catch(ctx).map_err(|err| anyhow!("{scope}:\n{err}"))
669}