Skip to main content

aiclient_api/daemon/
control.rs

1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tokio::net::UnixListener;
4
5use crate::server::state::AppState;
6
7pub async fn start_control_server(state: AppState) -> Result<()> {
8    let socket_path = crate::util::xdg::socket_path();
9
10    // Ensure parent directory exists
11    if let Some(parent) = socket_path.parent() {
12        std::fs::create_dir_all(parent)?;
13    }
14
15    // Remove existing socket if present
16    match std::fs::remove_file(&socket_path) {
17        Ok(()) => {}
18        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
19        Err(e) => return Err(anyhow::anyhow!(e).context("Failed to remove existing socket file")),
20    }
21
22    let listener = UnixListener::bind(&socket_path)
23        .with_context(|| format!("Failed to bind Unix socket at {}", socket_path.display()))?;
24
25    tracing::info!("Control server listening on {}", socket_path.display());
26
27    loop {
28        match listener.accept().await {
29            Ok((stream, _addr)) => {
30                let state = state.clone();
31                tokio::spawn(async move {
32                    if let Err(e) = handle_connection(stream, state).await {
33                        tracing::warn!("Control connection error: {:#}", e);
34                    }
35                });
36            }
37            Err(e) => {
38                tracing::error!("Failed to accept control connection: {:#}", e);
39            }
40        }
41    }
42}
43
44async fn handle_connection(
45    mut stream: tokio::net::UnixStream,
46    state: AppState,
47) -> Result<()> {
48    // Read length-prefixed JSON request
49    let mut len_buf = [0u8; 4];
50    stream.read_exact(&mut len_buf).await.context("Failed to read request length")?;
51    let len = u32::from_be_bytes(len_buf) as usize;
52
53    if len > 1024 * 1024 {
54        anyhow::bail!("Request too large: {} bytes", len);
55    }
56
57    let mut req_buf = vec![0u8; len];
58    stream.read_exact(&mut req_buf).await.context("Failed to read request body")?;
59
60    let request: serde_json::Value = serde_json::from_slice(&req_buf)
61        .context("Failed to parse request JSON")?;
62
63    let response = dispatch_request(request, &state).await;
64
65    // Write length-prefixed JSON response
66    let resp_bytes = serde_json::to_vec(&response)?;
67    stream.write_all(&(resp_bytes.len() as u32).to_be_bytes()).await?;
68    stream.write_all(&resp_bytes).await?;
69    stream.flush().await?;
70
71    Ok(())
72}
73
74async fn dispatch_request(
75    request: serde_json::Value,
76    state: &AppState,
77) -> serde_json::Value {
78    let method = match request.get("method").and_then(|v| v.as_str()) {
79        Some(m) => m.to_string(),
80        None => {
81            return serde_json::json!({
82                "ok": false,
83                "error": "Missing 'method' field in request"
84            });
85        }
86    };
87
88    match method.as_str() {
89        "status" => handle_status(state).await,
90        "config.show" => handle_config_show(state).await,
91        "config.reload" => handle_config_reload(state).await,
92        "models" => handle_models(state).await,
93        "provider.enable" => {
94            let name = request
95                .get("params")
96                .and_then(|p| p.get("name"))
97                .and_then(|n| n.as_str())
98                .unwrap_or("")
99                .to_string();
100            handle_provider_enable(name).await
101        }
102        "provider.disable" => {
103            let name = request
104                .get("params")
105                .and_then(|p| p.get("name"))
106                .and_then(|n| n.as_str())
107                .unwrap_or("")
108                .to_string();
109            handle_provider_disable(name).await
110        }
111        "config.set" => {
112            let params = request.get("params");
113            let key = params
114                .and_then(|p| p.get("key"))
115                .and_then(|v| v.as_str())
116                .unwrap_or("")
117                .to_string();
118            let value = params
119                .and_then(|p| p.get("value"))
120                .cloned()
121                .unwrap_or(serde_json::Value::Null);
122            handle_config_set(key, value).await
123        }
124        "logs.tail" => {
125            let n = request
126                .get("params")
127                .and_then(|p| p.get("lines"))
128                .and_then(|v| v.as_u64())
129                .unwrap_or(50) as usize;
130            handle_logs_tail(n).await
131        }
132        unknown => {
133            serde_json::json!({
134                "ok": false,
135                "error": format!("Unknown method: {}", unknown)
136            })
137        }
138    }
139}
140
141async fn handle_status(state: &AppState) -> serde_json::Value {
142    let uptime_secs = state.start_time.elapsed().as_secs();
143    let providers = state.providers.read().await;
144    let provider_count = providers.len();
145
146    let provider_health: serde_json::Value = providers
147        .iter()
148        .map(|(name, provider)| {
149            (name.clone(), serde_json::json!({
150                "healthy": provider.is_healthy()
151            }))
152        })
153        .collect::<serde_json::Map<String, serde_json::Value>>()
154        .into();
155
156    serde_json::json!({
157        "ok": true,
158        "data": {
159            "uptime_seconds": uptime_secs,
160            "provider_count": provider_count,
161            "connections": 0,
162            "providers": provider_health
163        }
164    })
165}
166
167async fn handle_config_show(state: &AppState) -> serde_json::Value {
168    let config = state.config.load();
169    match serde_json::to_value(config.as_ref()) {
170        Ok(v) => serde_json::json!({ "ok": true, "data": v }),
171        Err(e) => serde_json::json!({ "ok": false, "error": format!("Serialization error: {}", e) }),
172    }
173}
174
175async fn handle_config_reload(state: &AppState) -> serde_json::Value {
176    match crate::config::load_default_config() {
177        Ok(new_config) => {
178            state.config.store(std::sync::Arc::new(new_config));
179            serde_json::json!({ "ok": true, "data": { "message": "Config reloaded" } })
180        }
181        Err(e) => {
182            serde_json::json!({ "ok": false, "error": format!("Failed to reload config: {:#}", e) })
183        }
184    }
185}
186
187async fn handle_models(state: &AppState) -> serde_json::Value {
188    let providers = state.providers.read().await;
189    let mut all_models = Vec::new();
190
191    for (_name, provider) in providers.iter() {
192        match provider.list_models().await {
193            Ok(models) => {
194                all_models.extend(models);
195            }
196            Err(e) => {
197                tracing::warn!("Failed to list models for provider: {:#}", e);
198            }
199        }
200    }
201
202    match serde_json::to_value(&all_models) {
203        Ok(v) => serde_json::json!({ "ok": true, "data": { "models": v } }),
204        Err(e) => serde_json::json!({ "ok": false, "error": format!("Serialization error: {}", e) }),
205    }
206}
207
208async fn handle_provider_enable(name: String) -> serde_json::Value {
209    if name.is_empty() {
210        return serde_json::json!({ "ok": false, "error": "Missing provider name" });
211    }
212    // Not fully implemented yet — would require config hot-reload
213    serde_json::json!({
214        "ok": true,
215        "data": { "message": format!("Provider '{}' enable requested (requires restart to take effect)", name) }
216    })
217}
218
219async fn handle_provider_disable(name: String) -> serde_json::Value {
220    if name.is_empty() {
221        return serde_json::json!({ "ok": false, "error": "Missing provider name" });
222    }
223    // Not fully implemented yet — would require config hot-reload
224    serde_json::json!({
225        "ok": true,
226        "data": { "message": format!("Provider '{}' disable requested (requires restart to take effect)", name) }
227    })
228}
229
230async fn handle_config_set(key: String, value: serde_json::Value) -> serde_json::Value {
231    if key.is_empty() {
232        return serde_json::json!({ "ok": false, "error": "Missing config key" });
233    }
234    // Not yet implemented — would require config hot-reload and persistence
235    serde_json::json!({
236        "ok": true,
237        "data": { "message": format!("config.set for '{}' not yet implemented", key), "key": key, "value": value }
238    })
239}
240
241async fn handle_logs_tail(n: usize) -> serde_json::Value {
242    let log_path = crate::util::xdg::log_path();
243    match tokio::fs::read_to_string(&log_path).await {
244        Ok(contents) => {
245            let lines: Vec<&str> = contents.lines().collect();
246            let start = lines.len().saturating_sub(n);
247            let tail: Vec<&str> = lines[start..].to_vec();
248            serde_json::json!({
249                "ok": true,
250                "data": { "lines": tail, "count": tail.len(), "path": log_path.display().to_string() }
251            })
252        }
253        Err(e) => {
254            serde_json::json!({
255                "ok": false,
256                "error": format!("Failed to read log file at {}: {}", log_path.display(), e)
257            })
258        }
259    }
260}