Skip to main content

vs_daemon/server/
mod.rs

1//! Cross-platform local-socket server: accept connections, dispatch
2//! wire requests, write wire responses.
3//!
4//! Uses [`interprocess`] to abstract the platform IPC primitive:
5//! AF_UNIX socket files on Unix, named pipes on Windows. Each
6//! connection is a separate Tokio task. Per-primitive handlers live
7//! in submodules; this file owns the listener loop, the
8//! per-connection reader, and the dispatch table.
9
10mod engine_ops;
11mod helpers;
12mod lifecycle;
13mod page_ops;
14mod store_ops;
15
16use std::path::Path;
17use std::sync::Arc;
18
19use interprocess::local_socket::tokio::{prelude::*, Listener, Stream};
20use interprocess::local_socket::ListenerOptions;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22
23use vs_protocol::{ErrorCode, Request};
24
25use crate::daemon::Daemon;
26use helpers::format_error;
27
28/// Bind a local socket at `path` and serve `daemon` on it. On Unix
29/// `path` is the AF_UNIX socket file; on Windows it's used to derive
30/// a stable namespaced pipe name (see [`crate::transport::path_to_name`]).
31/// Loops until `shutdown` resolves.
32pub async fn serve(
33    daemon: Daemon,
34    path: impl AsRef<Path>,
35    mut shutdown: tokio::sync::oneshot::Receiver<()>,
36) -> std::io::Result<()> {
37    let path = path.as_ref();
38    // On Unix, a stale socket file blocks bind. Best-effort cleanup.
39    #[cfg(unix)]
40    if path.exists() {
41        let _ = std::fs::remove_file(path);
42    }
43    let name = crate::transport::path_to_name(path).map_err(|e| {
44        tracing::error!(?path, error = %e, "could not derive ipc name from socket path");
45        e
46    })?;
47    let listener: Listener = ListenerOptions::new()
48        .name(name)
49        .create_tokio()
50        .map_err(|e| {
51            // On Unix, AF_UNIX sun_path is 104 bytes — paths beyond
52            // that fail with ENAMETOOLONG. Log the path + the OS
53            // error so callers can see why the socket never
54            // appeared instead of the daemon dying silently.
55            tracing::error!(
56                ?path,
57                len = path.as_os_str().len(),
58                error = %e,
59                "failed to bind local socket"
60            );
61            e
62        })?;
63    tracing::info!(?path, "vibesurfer daemon listening");
64
65    let daemon = Arc::new(daemon);
66    loop {
67        tokio::select! {
68            biased;
69            _ = &mut shutdown => {
70                tracing::info!("shutdown requested");
71                break;
72            }
73            accept = listener.accept() => {
74                let stream = accept?;
75                let daemon = daemon.clone();
76                tokio::spawn(async move {
77                    if let Err(e) = handle_connection(daemon, stream).await {
78                        tracing::warn!(error = %e, "connection ended");
79                    }
80                });
81            }
82        }
83    }
84
85    #[cfg(unix)]
86    let _ = std::fs::remove_file(path);
87    Ok(())
88}
89
90/// Drive one client connection: read lines, dispatch, write responses.
91async fn handle_connection(daemon: Arc<Daemon>, stream: Stream) -> std::io::Result<()> {
92    let (read, mut write) = stream.split();
93    let mut reader = BufReader::new(read).lines();
94    while let Some(line) = reader.next_line().await? {
95        if line.is_empty() {
96            continue;
97        }
98        let resp_text = match Request::parse(&line) {
99            Ok(req) => {
100                tracing::info!(primitive = %req.primitive, "dispatch start");
101                let primitive = req.primitive.clone();
102                let daemon = daemon.clone();
103                let result = tokio::task::spawn_blocking(move || {
104                    let mut outcomes = daemon.dispatch(&[req]);
105                    outcomes.pop().map_or_else(String::new, |o| o.wire)
106                })
107                .await;
108                tracing::info!(primitive = %primitive, ok = result.is_ok(), "dispatch end");
109                result.unwrap_or_else(|join_err| {
110                    tracing::error!(primitive = %primitive, error = %join_err, "dispatch panic");
111                    format_error(
112                        ErrorCode::EngineCrash,
113                        vec![format!("dispatch panic: {join_err}")],
114                    )
115                })
116            }
117            Err(parse_err) => format_error(ErrorCode::BadRequest, vec![format!("{parse_err}")]),
118        };
119        write.write_all(resp_text.as_bytes()).await?;
120        write.write_all(b"\n").await?;
121    }
122    Ok(())
123}
124
125/// Translate a parsed [`Request`] into a wire response (warnings +
126/// envelope + body, terminated by `\n` per the protocol spec — the
127/// caller adds the final blank line).
128#[must_use]
129pub fn dispatch(daemon: &Daemon, req: &Request) -> String {
130    match req.primitive.as_str() {
131        "vs_session_open" => lifecycle::handle_session_open(daemon, req),
132        "vs_session_close" => lifecycle::handle_session_close(daemon, req),
133        "vs_open" => lifecycle::handle_open(daemon, req),
134        "vs_close" => lifecycle::handle_close(daemon, req),
135        "vs_view" => page_ops::handle_view(daemon, req),
136        "vs_read" => page_ops::handle_read(daemon, req),
137        "vs_act" => page_ops::handle_act(daemon, req),
138        "vs_find" => page_ops::handle_find(daemon, req),
139        "vs_wait" => page_ops::handle_wait(daemon, req),
140        "vs_status" => page_ops::handle_status(daemon, req),
141        "vs_extract" => store_ops::handle_extract(daemon, req),
142        "vs_mark" => store_ops::handle_mark(daemon, req),
143        "vs_annotate" => store_ops::handle_annotate(daemon, req),
144        "vs_log" => store_ops::handle_log(daemon, req),
145        "vs_skill" => engine_ops::handle_skill(daemon, req),
146        "vs_capture" => engine_ops::handle_capture(daemon, req),
147        "vs_viewport" => engine_ops::handle_viewport(daemon, req),
148        "vs_layout" => engine_ops::handle_layout(daemon, req),
149        "vs_auth" => engine_ops::handle_auth(daemon, req),
150        "vs_inspect" => engine_ops::handle_inspect(daemon, req),
151        other => format_error(
152            ErrorCode::BadRequest,
153            vec![format!("unknown primitive: {other}")],
154        ),
155    }
156}