Skip to main content

kaizen/daemon/
client.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Daemon IPC client calls.
3
4use crate::ipc::{
5    CaptureStatus, ClientHello, ClientKind, DaemonRequest, DaemonResponse, ObservedSession,
6    PROTO_VERSION, ProxyEndpoint, ServerHello, read_frame, write_frame,
7};
8use anyhow::{Context, Result, anyhow};
9use std::path::Path;
10use tokio::net::UnixStream;
11
12pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
13    super::ensure_running()?;
14    tokio::runtime::Runtime::new()?.block_on(request_async(request))
15}
16
17fn request_blocking_for(workspace: &str, request: DaemonRequest) -> Result<DaemonResponse> {
18    super::ensure_running_for(Path::new(workspace))?;
19    tokio::runtime::Runtime::new()?.block_on(request_async(request))
20}
21
22pub fn hello_blocking(client: ClientKind, workspace: Option<String>) -> Result<ServerHello> {
23    let request = DaemonRequest::Hello(ClientHello {
24        proto_version: PROTO_VERSION,
25        client,
26        workspace: workspace.clone(),
27    });
28    let response = match workspace.as_deref() {
29        Some(workspace) => request_blocking_for(workspace, request),
30        None => request_blocking(request),
31    }?;
32    match response {
33        DaemonResponse::Hello(hello) => Ok(hello),
34        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
35        _ => Err(anyhow!("unexpected daemon hello response")),
36    }
37}
38
39pub fn ensure_capture_blocking(workspace: String, deep: bool) -> Result<CaptureStatus> {
40    let request = DaemonRequest::EnsureWorkspaceCapture {
41        workspace: workspace.clone(),
42        deep,
43    };
44    match request_blocking_for(&workspace, request)? {
45        DaemonResponse::CaptureStatus(status) => Ok(*status),
46        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
47        _ => Err(anyhow!("unexpected daemon capture response")),
48    }
49}
50
51pub fn ensure_proxy_blocking(workspace: String, provider: String) -> Result<ProxyEndpoint> {
52    let request = DaemonRequest::EnsureProxy {
53        workspace: workspace.clone(),
54        provider,
55    };
56    match request_blocking_for(&workspace, request)? {
57        DaemonResponse::ProxyEndpoint(endpoint) => Ok(endpoint),
58        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
59        _ => Err(anyhow!("unexpected daemon proxy response")),
60    }
61}
62
63pub fn begin_observed_session_blocking(
64    workspace: String,
65    agent: String,
66) -> Result<ObservedSession> {
67    let request = DaemonRequest::BeginObservedSession {
68        workspace: workspace.clone(),
69        agent,
70    };
71    match request_blocking_for(&workspace, request)? {
72        DaemonResponse::ObservedSession(session) => Ok(session),
73        DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
74        _ => Err(anyhow!("unexpected daemon observe response")),
75    }
76}
77
78pub(super) async fn request_async(request: DaemonRequest) -> Result<DaemonResponse> {
79    let paths = super::runtime_paths()?;
80    let mut stream = UnixStream::connect(&paths.sock)
81        .await
82        .with_context(|| format!("connect daemon socket: {}", paths.sock.display()))?;
83    write_frame(&mut stream, &request).await?;
84    read_frame(&mut stream).await
85}