use std::collections::HashSet;
#[cfg(unix)]
use std::fs;
#[cfg(unix)]
use std::os::unix::fs::FileTypeExt;
#[cfg(unix)]
use std::path::Path;
use std::time::Duration;
use anyhow::{Result, anyhow, bail};
use nvim_rs::{Handler, Neovim, compat::tokio::Compat, create::tokio as create};
use tokio::io::WriteHalf;
use tokio::time::timeout;
use tracing::debug;
use crate::config::{ResolvedConfig, TargetKind};
use crate::matcher::CaptureMap;
use crate::template::{Context, build_context, new_engine, render};
#[cfg(unix)]
type NvimConnection = tokio::net::UnixStream;
#[cfg(windows)]
type NvimConnection = tokio::net::windows::named_pipe::NamedPipeClient;
type NvimWriter = Compat<WriteHalf<NvimConnection>>;
#[derive(Clone)]
struct DummyHandler;
impl Handler for DummyHandler {
type Writer = NvimWriter;
}
const GROUP_SENTINEL: &str = "__TODOKE_GROUP_SENTINEL_QHj9G2__";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Instance {
pub target: String,
pub group: String,
pub listen: String,
pub alive: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ListenSkeleton {
pub(crate) prefix: String,
pub(crate) suffix: String,
}
impl ListenSkeleton {
pub(crate) fn from_template(cfg: &ResolvedConfig, template: &str) -> Result<Self> {
let rendered = render_with_group(cfg, template, GROUP_SENTINEL)?;
let Some((prefix, suffix)) = rendered.split_once(GROUP_SENTINEL) else {
bail!("listen template does not reference {{{{ group }}}}");
};
if suffix.contains(GROUP_SENTINEL) {
bail!("listen template references {{{{ group }}}} more than once");
}
if suffix.contains('/') {
bail!(
"listen template uses {{{{ group }}}} outside the final path component \
(suffix `{suffix}` contains `/`); enumeration walks one directory level only",
);
}
Ok(Self {
prefix: prefix.to_string(),
suffix: suffix.to_string(),
})
}
pub(crate) fn extract_group(&self, candidate: &str) -> Option<String> {
let rest = candidate.strip_prefix(&self.prefix)?;
let group = rest.strip_suffix(&self.suffix)?;
if group.is_empty() {
None
} else {
Some(group.to_string())
}
}
#[allow(dead_code)] pub(crate) fn render_for(&self, group: &str) -> String {
format!("{}{}{}", self.prefix, group, self.suffix)
}
}
fn render_with_group(cfg: &ResolvedConfig, template: &str, group: &str) -> Result<String> {
let mut tera = new_engine();
let cwd = std::env::current_dir()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_default();
let cap = CaptureMap::new();
let ctx = build_context(Context {
input: None,
command: "",
cwd: &cwd,
group,
rule_name: "",
vars: &cfg.raw.vars,
cap: &cap,
passthrough: &[],
});
render(&mut tera, template, &ctx)
}
pub async fn discover(cfg: &ResolvedConfig) -> Vec<Instance> {
let mut tasks = tokio::task::JoinSet::new();
let mut seen: HashSet<String> = HashSet::new();
for (target_name, target) in &cfg.raw.todoke {
if target.kind != TargetKind::Neovim {
continue;
}
let Some(template) = target.listen.as_deref() else {
continue;
};
let skeleton = match ListenSkeleton::from_template(cfg, template) {
Ok(s) => s,
Err(e) => {
debug!(target = %target_name, reason = %e, "skipping target during discovery");
continue;
}
};
for path in enumerate_candidates(&skeleton) {
if !seen.insert(path.clone()) {
continue;
}
let Some(group) = skeleton.extract_group(&path) else {
continue;
};
let target_name = target_name.clone();
tasks.spawn(async move {
let alive = ping(&path).await;
Instance {
target: target_name,
group,
listen: path,
alive,
}
});
}
}
let mut out = Vec::with_capacity(tasks.len());
while let Some(res) = tasks.join_next().await {
if let Ok(inst) = res {
out.push(inst);
}
}
out.sort_by(|a, b| a.target.cmp(&b.target).then_with(|| a.group.cmp(&b.group)));
out
}
#[cfg(unix)]
fn enumerate_candidates(skeleton: &ListenSkeleton) -> Vec<String> {
let prefix_path = Path::new(&skeleton.prefix);
let (parent, basename_prefix) = if skeleton.prefix.ends_with('/') {
(prefix_path, "")
} else {
(
prefix_path
.parent()
.filter(|p| !p.as_os_str().is_empty())
.unwrap_or(Path::new(".")),
prefix_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or(""),
)
};
let basename_suffix = skeleton.suffix.as_str();
let Ok(entries) = fs::read_dir(parent) else {
return Vec::new();
};
let mut out = Vec::new();
for ent in entries.flatten() {
let Ok(file_type) = ent.file_type() else {
continue;
};
if !file_type.is_socket() {
continue;
}
let name = ent.file_name();
let Some(name_s) = name.to_str() else {
continue;
};
if name_s.starts_with(basename_prefix)
&& name_s.ends_with(basename_suffix)
&& name_s.len() > basename_prefix.len() + basename_suffix.len()
{
out.push(ent.path().to_string_lossy().into_owned());
}
}
out
}
#[cfg(windows)]
fn enumerate_candidates(skeleton: &ListenSkeleton) -> Vec<String> {
use std::ffi::OsStr;
use std::os::windows::ffi::{OsStrExt, OsStringExt};
use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
use windows_sys::Win32::Storage::FileSystem::{
FindClose, FindFirstFileW, FindNextFileW, WIN32_FIND_DATAW,
};
let pattern: Vec<u16> = OsStr::new(r"\\.\pipe\*")
.encode_wide()
.chain(std::iter::once(0))
.collect();
let mut data: WIN32_FIND_DATAW = unsafe { std::mem::zeroed() };
let handle = unsafe { FindFirstFileW(pattern.as_ptr(), &mut data) };
if handle == INVALID_HANDLE_VALUE {
return Vec::new();
}
let prefix = skeleton.prefix.as_str();
let suffix = skeleton.suffix.as_str();
let mut out = Vec::new();
loop {
let len = data
.cFileName
.iter()
.position(|&c| c == 0)
.unwrap_or(data.cFileName.len());
let name = std::ffi::OsString::from_wide(&data.cFileName[..len])
.to_string_lossy()
.into_owned();
let full = format!(r"\\.\pipe\{name}");
if full.starts_with(prefix)
&& full.ends_with(suffix)
&& full.len() > prefix.len() + suffix.len()
{
out.push(full);
}
if unsafe { FindNextFileW(handle, &mut data) } == 0 {
break;
}
}
unsafe {
FindClose(handle);
}
out
}
const PROBE_TIMEOUT: Duration = Duration::from_millis(500);
const QUIT_GRACE: Duration = Duration::from_millis(800);
async fn ping(listen: &str) -> bool {
timeout(PROBE_TIMEOUT, create::new_path(listen, DummyHandler))
.await
.is_ok_and(|r| r.is_ok())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KillOutcome {
Quit,
StillAlive,
Forced { pid: u32 },
}
const MAX_FORCE_ROUNDS: usize = 8;
pub async fn kill_instance(listen: &str, force: bool) -> Result<KillOutcome> {
let (nvim, io_handle) = timeout(PROBE_TIMEOUT, create::new_path(listen, DummyHandler))
.await
.map_err(|_| anyhow!("connect timed out after {:?}", PROBE_TIMEOUT))?
.map_err(|e| anyhow!("RPC connect failed: {e}"))?;
let mut next_pid = if force {
capture_pid(&nvim, listen).await
} else {
None
};
let _ = timeout(QUIT_GRACE, nvim.command("qall!")).await;
drop(nvim);
let _ = timeout(QUIT_GRACE, io_handle).await;
if !ping(listen).await {
return Ok(KillOutcome::Quit);
}
if !force {
return Ok(KillOutcome::StillAlive);
}
let mut forced_pid: Option<u32> = None;
for _ in 0..MAX_FORCE_ROUNDS {
let Some(pid) = next_pid else {
if forced_pid.is_none() {
return Err(anyhow!(
"qall! did not take effect and PID lookup failed; cannot --force"
));
}
break;
};
if os_kill(pid).is_ok() {
forced_pid.get_or_insert(pid);
}
let _ = cleanup_stale(listen);
match timeout(PROBE_TIMEOUT, create::new_path(listen, DummyHandler)).await {
Ok(Ok((nvim, io_handle))) => {
next_pid = capture_pid(&nvim, listen).await;
drop(nvim);
let _ = timeout(QUIT_GRACE, io_handle).await;
}
_ => {
return Ok(match forced_pid {
Some(pid) => KillOutcome::Forced { pid },
None => KillOutcome::Quit,
});
}
}
}
if !ping(listen).await {
return Ok(match forced_pid {
Some(pid) => KillOutcome::Forced { pid },
None => KillOutcome::Quit,
});
}
if let Some(pid) = forced_pid {
return Err(anyhow!(
"force-kill incomplete: pipe still alive after killing pid={pid}; \
additional listeners may serve the same path"
));
}
Err(anyhow!(
"force-kill exhausted after {MAX_FORCE_ROUNDS} rounds; listen path still alive"
))
}
pub fn cleanup_stale(listen: &str) -> Result<bool> {
cleanup_stale_inner(listen)
}
#[cfg(unix)]
fn cleanup_stale_inner(listen: &str) -> Result<bool> {
match std::fs::remove_file(listen) {
Ok(()) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(true),
Err(e) => Err(anyhow!("{e}")),
}
}
#[cfg(windows)]
fn cleanup_stale_inner(_listen: &str) -> Result<bool> {
Ok(false)
}
#[cfg(unix)]
fn os_kill(pid: u32) -> Result<()> {
let pid_t =
libc::pid_t::try_from(pid).map_err(|_| anyhow!("pid {pid} does not fit in libc::pid_t"))?;
let rc = unsafe { libc::kill(pid_t, libc::SIGKILL) };
if rc == 0 {
Ok(())
} else {
Err(anyhow!(
"SIGKILL pid={pid} failed: {}",
std::io::Error::last_os_error()
))
}
}
async fn capture_pid(nvim: &Neovim<NvimWriter>, listen: &str) -> Option<u32> {
let from_eval = match timeout(PROBE_TIMEOUT, nvim.eval("getpid()")).await {
Ok(Ok(v)) => v.as_i64().and_then(|n| u32::try_from(n).ok()),
_ => None,
};
if from_eval.is_some() {
return from_eval;
}
pid_from_listen(listen).await
}
#[cfg(windows)]
async fn pid_from_listen(listen: &str) -> Option<u32> {
let listen = listen.to_string();
tokio::task::spawn_blocking(move || pid_from_listen_blocking(&listen))
.await
.ok()
.flatten()
}
#[cfg(windows)]
fn pid_from_listen_blocking(listen: &str) -> Option<u32> {
use std::ffi::OsStr;
use std::os::windows::ffi::OsStrExt;
use windows_sys::Win32::Foundation::{CloseHandle, INVALID_HANDLE_VALUE};
use windows_sys::Win32::Storage::FileSystem::{CreateFileW, FILE_GENERIC_READ, OPEN_EXISTING};
use windows_sys::Win32::System::Pipes::GetNamedPipeServerProcessId;
if !listen.starts_with(r"\\.\pipe\") {
return None;
}
let wide: Vec<u16> = OsStr::new(listen)
.encode_wide()
.chain(std::iter::once(0))
.collect();
let handle = unsafe {
CreateFileW(
wide.as_ptr(),
FILE_GENERIC_READ,
0,
std::ptr::null(),
OPEN_EXISTING,
0,
std::ptr::null_mut(),
)
};
if handle == INVALID_HANDLE_VALUE {
return None;
}
let mut pid: u32 = 0;
let ok = unsafe { GetNamedPipeServerProcessId(handle, &mut pid) };
unsafe {
CloseHandle(handle);
}
if ok == 0 || pid == 0 { None } else { Some(pid) }
}
#[cfg(unix)]
async fn pid_from_listen(_listen: &str) -> Option<u32> {
None
}
#[cfg(windows)]
fn os_kill(pid: u32) -> Result<()> {
use windows_sys::Win32::Foundation::{CloseHandle, FALSE};
use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_TERMINATE, TerminateProcess};
let handle = unsafe { OpenProcess(PROCESS_TERMINATE, FALSE, pid) };
if handle.is_null() {
return Err(anyhow!(
"OpenProcess(pid={pid}) failed: {}",
std::io::Error::last_os_error()
));
}
let rc = unsafe { TerminateProcess(handle, 1) };
let term_err = std::io::Error::last_os_error();
unsafe {
CloseHandle(handle);
}
if rc == 0 {
Err(anyhow!("TerminateProcess(pid={pid}) failed: {term_err}"))
} else {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::load_from_str;
fn cfg(src: &str) -> ResolvedConfig {
load_from_str(src).expect("config parses")
}
fn min_cfg() -> ResolvedConfig {
cfg(r#"
[todoke.dummy]
command = "true"
[[rules]]
match = '.*'
to = "dummy"
"#)
}
#[test]
fn skeleton_extracts_prefix_and_suffix_around_group() {
let c = min_cfg();
let s = ListenSkeleton::from_template(&c, "/tmp/nvim-todoke-{{ group }}.sock").unwrap();
assert_eq!(s.prefix, "/tmp/nvim-todoke-");
assert_eq!(s.suffix, ".sock");
}
#[test]
fn skeleton_handles_windows_branch_via_is_windows() {
let c = min_cfg();
let template = r"{% if is_windows() %}\\.\pipe\nvim-todoke-{{ group }}{% else %}/tmp/nvim-todoke-{{ group }}.sock{% endif %}";
let s = ListenSkeleton::from_template(&c, template).unwrap();
if cfg!(target_os = "windows") {
assert_eq!(s.prefix, r"\\.\pipe\nvim-todoke-");
assert_eq!(s.suffix, "");
} else {
assert_eq!(s.prefix, "/tmp/nvim-todoke-");
assert_eq!(s.suffix, ".sock");
}
}
#[test]
fn skeleton_errors_when_group_is_missing() {
let c = min_cfg();
let err = ListenSkeleton::from_template(&c, "/tmp/fixed.sock").unwrap_err();
assert!(err.to_string().contains("does not reference"));
}
#[test]
fn skeleton_errors_when_group_appears_twice() {
let c = min_cfg();
let err =
ListenSkeleton::from_template(&c, "/tmp/{{ group }}/{{ group }}.sock").unwrap_err();
assert!(err.to_string().contains("more than once"));
}
#[test]
fn skeleton_errors_when_group_is_not_in_final_path_component() {
let c = min_cfg();
let err = ListenSkeleton::from_template(&c, "/tmp/{{ group }}/nvim.sock").unwrap_err();
assert!(err.to_string().contains("final path component"));
}
#[test]
fn extract_group_recovers_value_when_shape_matches() {
let s = ListenSkeleton {
prefix: "/tmp/nvim-todoke-".into(),
suffix: ".sock".into(),
};
assert_eq!(
s.extract_group("/tmp/nvim-todoke-default.sock"),
Some("default".to_string()),
);
assert_eq!(
s.extract_group("/tmp/nvim-todoke-git-commit.sock"),
Some("git-commit".to_string()),
);
}
#[test]
fn extract_group_rejects_non_matching_paths() {
let s = ListenSkeleton {
prefix: "/tmp/nvim-todoke-".into(),
suffix: ".sock".into(),
};
assert_eq!(s.extract_group("/tmp/other.sock"), None);
assert_eq!(s.extract_group("/tmp/nvim-todoke-default.txt"), None);
assert_eq!(s.extract_group("/tmp/nvim-todoke-.sock"), None);
}
#[test]
fn render_for_round_trips_with_extract_group() {
let s = ListenSkeleton {
prefix: "/tmp/nvim-todoke-".into(),
suffix: ".sock".into(),
};
let path = s.render_for("scratch");
assert_eq!(path, "/tmp/nvim-todoke-scratch.sock");
assert_eq!(s.extract_group(&path), Some("scratch".to_string()));
}
#[test]
fn skeleton_resolves_vars_references() {
let c = cfg(r#"
[vars]
base = "/var/run/todoke"
[todoke.nvim]
kind = "neovim"
command = "nvim"
listen = "{{ vars.base }}/nvim-{{ group }}.sock"
[[rules]]
match = '.*'
to = "nvim"
"#);
let template = c.raw.todoke["nvim"].listen.as_deref().unwrap();
let s = ListenSkeleton::from_template(&c, template).unwrap();
assert_eq!(s.prefix, "/var/run/todoke/nvim-");
assert_eq!(s.suffix, ".sock");
}
#[cfg(unix)]
mod discovery_unix {
use super::*;
use std::fs::File;
use std::os::unix::net::UnixListener;
use std::path::PathBuf;
fn make_stale_socket(path: &std::path::Path) {
let _l = UnixListener::bind(path).expect("bind unix socket");
}
fn unique_tempdir() -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let stamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let pid = std::process::id();
let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
let d = PathBuf::from("/tmp").join(format!("todoke-registry-{stamp}-{pid}-{seq}"));
std::fs::create_dir_all(&d).unwrap();
d
}
fn cfg_with_tmpdir(tmp: &std::path::Path) -> ResolvedConfig {
let src = format!(
r#"
[vars]
tmp = "{tmp}"
[todoke.nvim]
kind = "neovim"
command = "nvim"
listen = "{{{{ vars.tmp }}}}/nvim-todoke-{{{{ group }}}}.sock"
[[rules]]
match = '.*'
to = "nvim"
"#,
tmp = tmp.display(),
);
cfg(&src)
}
#[tokio::test]
async fn discover_finds_matching_filesystem_entries() {
let tmp = unique_tempdir();
make_stale_socket(&tmp.join("nvim-todoke-default.sock"));
make_stale_socket(&tmp.join("nvim-todoke-git.sock"));
make_stale_socket(&tmp.join("other.sock"));
File::create(tmp.join("nvim-todoke-default.txt")).unwrap();
let cfg = cfg_with_tmpdir(&tmp);
let instances = discover(&cfg).await;
let groups: Vec<&str> = instances.iter().map(|i| i.group.as_str()).collect();
assert_eq!(groups, vec!["default", "git"]);
for inst in &instances {
assert_eq!(inst.target, "nvim");
assert!(!inst.alive, "no real nvim → alive must be false");
}
}
#[tokio::test]
async fn discover_filters_out_regular_files_matching_the_pattern() {
let tmp = unique_tempdir();
make_stale_socket(&tmp.join("nvim-todoke-real.sock"));
File::create(tmp.join("nvim-todoke-imposter.sock")).unwrap();
let cfg = cfg_with_tmpdir(&tmp);
let instances = discover(&cfg).await;
let groups: Vec<&str> = instances.iter().map(|i| i.group.as_str()).collect();
assert_eq!(groups, vec!["real"]);
}
#[tokio::test]
async fn discover_returns_empty_when_no_targets_use_neovim() {
let src = r#"
[todoke.echo]
command = "echo"
[[rules]]
match = '.*'
to = "echo"
"#;
let cfg = cfg(src);
assert!(discover(&cfg).await.is_empty());
}
#[tokio::test]
async fn cleanup_stale_unlinks_existing_socket_file() {
let tmp = unique_tempdir();
let path = tmp.join("nvim-todoke-default.sock");
std::fs::File::create(&path).unwrap();
assert!(path.exists());
let removed = cleanup_stale(&path.to_string_lossy()).unwrap();
assert!(removed, "Unix should report removed = true");
assert!(!path.exists(), "socket file should be gone");
}
#[tokio::test]
async fn cleanup_stale_is_idempotent_on_missing_file() {
let tmp = unique_tempdir();
let path = tmp.join("never-existed.sock");
let removed = cleanup_stale(&path.to_string_lossy()).unwrap();
assert!(removed);
}
#[tokio::test]
async fn discover_skips_targets_without_group_in_listen() {
let tmp = unique_tempdir();
let src = format!(
r#"
[vars]
tmp = "{tmp}"
[todoke.nvim]
kind = "neovim"
command = "nvim"
listen = "{{{{ vars.tmp }}}}/fixed.sock"
[[rules]]
match = '.*'
to = "nvim"
"#,
tmp = tmp.display(),
);
File::create(tmp.join("fixed.sock")).unwrap();
let cfg = cfg(&src);
assert!(discover(&cfg).await.is_empty());
}
async fn spawn_stalling_server(path: &std::path::Path) -> tokio::task::JoinHandle<()> {
let listener = tokio::net::UnixListener::bind(path).expect("bind stalled server");
tokio::spawn(async move {
let mut held = Vec::new();
while let Ok((stream, _)) = listener.accept().await {
held.push(stream);
}
})
}
#[tokio::test]
async fn kill_instance_does_not_hang_on_unresponsive_server() {
let tmp = unique_tempdir();
let path = tmp.join("stalled.sock");
let server = spawn_stalling_server(&path).await;
let listen = path.to_string_lossy().into_owned();
let outer = tokio::time::timeout(
std::time::Duration::from_secs(15),
kill_instance(&listen, true),
)
.await;
server.abort();
assert!(
outer.is_ok(),
"kill_instance hung past 15s on unresponsive server"
);
let inner = outer.unwrap();
assert!(
inner.is_err(),
"expected an Err from kill_instance against stalled server, got {inner:?}"
);
}
#[tokio::test]
async fn kill_instance_does_not_hang_without_force() {
let tmp = unique_tempdir();
let path = tmp.join("stalled-noforce.sock");
let server = spawn_stalling_server(&path).await;
let listen = path.to_string_lossy().into_owned();
let outer = tokio::time::timeout(
std::time::Duration::from_secs(10),
kill_instance(&listen, false),
)
.await;
server.abort();
assert!(
outer.is_ok(),
"kill_instance(force=false) hung past 10s on unresponsive server"
);
let inner = outer.unwrap();
assert_eq!(inner.ok(), Some(KillOutcome::StillAlive));
}
}
}