aiclient_api/daemon/
control.rs1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tokio::net::UnixListener;
4
5use crate::server::state::AppState;
6
7pub async fn start_control_server(state: AppState) -> Result<()> {
8 let socket_path = crate::util::xdg::socket_path();
9
10 if let Some(parent) = socket_path.parent() {
12 std::fs::create_dir_all(parent)?;
13 }
14
15 match std::fs::remove_file(&socket_path) {
17 Ok(()) => {}
18 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
19 Err(e) => return Err(anyhow::anyhow!(e).context("Failed to remove existing socket file")),
20 }
21
22 let listener = UnixListener::bind(&socket_path)
23 .with_context(|| format!("Failed to bind Unix socket at {}", socket_path.display()))?;
24
25 tracing::info!("Control server listening on {}", socket_path.display());
26
27 loop {
28 match listener.accept().await {
29 Ok((stream, _addr)) => {
30 let state = state.clone();
31 tokio::spawn(async move {
32 if let Err(e) = handle_connection(stream, state).await {
33 tracing::warn!("Control connection error: {:#}", e);
34 }
35 });
36 }
37 Err(e) => {
38 tracing::error!("Failed to accept control connection: {:#}", e);
39 }
40 }
41 }
42}
43
44async fn handle_connection(
45 mut stream: tokio::net::UnixStream,
46 state: AppState,
47) -> Result<()> {
48 let mut len_buf = [0u8; 4];
50 stream.read_exact(&mut len_buf).await.context("Failed to read request length")?;
51 let len = u32::from_be_bytes(len_buf) as usize;
52
53 if len > 1024 * 1024 {
54 anyhow::bail!("Request too large: {} bytes", len);
55 }
56
57 let mut req_buf = vec![0u8; len];
58 stream.read_exact(&mut req_buf).await.context("Failed to read request body")?;
59
60 let request: serde_json::Value = serde_json::from_slice(&req_buf)
61 .context("Failed to parse request JSON")?;
62
63 let response = dispatch_request(request, &state).await;
64
65 let resp_bytes = serde_json::to_vec(&response)?;
67 stream.write_all(&(resp_bytes.len() as u32).to_be_bytes()).await?;
68 stream.write_all(&resp_bytes).await?;
69 stream.flush().await?;
70
71 Ok(())
72}
73
74async fn dispatch_request(
75 request: serde_json::Value,
76 state: &AppState,
77) -> serde_json::Value {
78 let method = match request.get("method").and_then(|v| v.as_str()) {
79 Some(m) => m.to_string(),
80 None => {
81 return serde_json::json!({
82 "ok": false,
83 "error": "Missing 'method' field in request"
84 });
85 }
86 };
87
88 match method.as_str() {
89 "status" => handle_status(state).await,
90 "config.show" => handle_config_show(state).await,
91 "config.reload" => handle_config_reload(state).await,
92 "models" => handle_models(state).await,
93 "provider.enable" => {
94 let name = request
95 .get("params")
96 .and_then(|p| p.get("name"))
97 .and_then(|n| n.as_str())
98 .unwrap_or("")
99 .to_string();
100 handle_provider_enable(name).await
101 }
102 "provider.disable" => {
103 let name = request
104 .get("params")
105 .and_then(|p| p.get("name"))
106 .and_then(|n| n.as_str())
107 .unwrap_or("")
108 .to_string();
109 handle_provider_disable(name).await
110 }
111 "config.set" => {
112 let params = request.get("params");
113 let key = params
114 .and_then(|p| p.get("key"))
115 .and_then(|v| v.as_str())
116 .unwrap_or("")
117 .to_string();
118 let value = params
119 .and_then(|p| p.get("value"))
120 .cloned()
121 .unwrap_or(serde_json::Value::Null);
122 handle_config_set(key, value).await
123 }
124 "logs.tail" => {
125 let n = request
126 .get("params")
127 .and_then(|p| p.get("lines"))
128 .and_then(|v| v.as_u64())
129 .unwrap_or(50) as usize;
130 handle_logs_tail(n).await
131 }
132 unknown => {
133 serde_json::json!({
134 "ok": false,
135 "error": format!("Unknown method: {}", unknown)
136 })
137 }
138 }
139}
140
141async fn handle_status(state: &AppState) -> serde_json::Value {
142 let uptime_secs = state.start_time.elapsed().as_secs();
143 let providers = state.providers.read().await;
144 let provider_count = providers.len();
145
146 let provider_health: serde_json::Value = providers
147 .iter()
148 .map(|(name, provider)| {
149 (name.clone(), serde_json::json!({
150 "healthy": provider.is_healthy()
151 }))
152 })
153 .collect::<serde_json::Map<String, serde_json::Value>>()
154 .into();
155
156 serde_json::json!({
157 "ok": true,
158 "data": {
159 "uptime_seconds": uptime_secs,
160 "provider_count": provider_count,
161 "connections": 0,
162 "providers": provider_health
163 }
164 })
165}
166
167async fn handle_config_show(state: &AppState) -> serde_json::Value {
168 let config = state.config.load();
169 match serde_json::to_value(config.as_ref()) {
170 Ok(v) => serde_json::json!({ "ok": true, "data": v }),
171 Err(e) => serde_json::json!({ "ok": false, "error": format!("Serialization error: {}", e) }),
172 }
173}
174
175async fn handle_config_reload(state: &AppState) -> serde_json::Value {
176 match crate::config::load_default_config() {
177 Ok(new_config) => {
178 state.config.store(std::sync::Arc::new(new_config));
179 serde_json::json!({ "ok": true, "data": { "message": "Config reloaded" } })
180 }
181 Err(e) => {
182 serde_json::json!({ "ok": false, "error": format!("Failed to reload config: {:#}", e) })
183 }
184 }
185}
186
187async fn handle_models(state: &AppState) -> serde_json::Value {
188 let providers = state.providers.read().await;
189 let mut all_models = Vec::new();
190
191 for (_name, provider) in providers.iter() {
192 match provider.list_models().await {
193 Ok(models) => {
194 all_models.extend(models);
195 }
196 Err(e) => {
197 tracing::warn!("Failed to list models for provider: {:#}", e);
198 }
199 }
200 }
201
202 match serde_json::to_value(&all_models) {
203 Ok(v) => serde_json::json!({ "ok": true, "data": { "models": v } }),
204 Err(e) => serde_json::json!({ "ok": false, "error": format!("Serialization error: {}", e) }),
205 }
206}
207
208async fn handle_provider_enable(name: String) -> serde_json::Value {
209 if name.is_empty() {
210 return serde_json::json!({ "ok": false, "error": "Missing provider name" });
211 }
212 serde_json::json!({
214 "ok": true,
215 "data": { "message": format!("Provider '{}' enable requested (requires restart to take effect)", name) }
216 })
217}
218
219async fn handle_provider_disable(name: String) -> serde_json::Value {
220 if name.is_empty() {
221 return serde_json::json!({ "ok": false, "error": "Missing provider name" });
222 }
223 serde_json::json!({
225 "ok": true,
226 "data": { "message": format!("Provider '{}' disable requested (requires restart to take effect)", name) }
227 })
228}
229
230async fn handle_config_set(key: String, value: serde_json::Value) -> serde_json::Value {
231 if key.is_empty() {
232 return serde_json::json!({ "ok": false, "error": "Missing config key" });
233 }
234 serde_json::json!({
236 "ok": true,
237 "data": { "message": format!("config.set for '{}' not yet implemented", key), "key": key, "value": value }
238 })
239}
240
241async fn handle_logs_tail(n: usize) -> serde_json::Value {
242 let log_path = crate::util::xdg::log_path();
243 match tokio::fs::read_to_string(&log_path).await {
244 Ok(contents) => {
245 let lines: Vec<&str> = contents.lines().collect();
246 let start = lines.len().saturating_sub(n);
247 let tail: Vec<&str> = lines[start..].to_vec();
248 serde_json::json!({
249 "ok": true,
250 "data": { "lines": tail, "count": tail.len(), "path": log_path.display().to_string() }
251 })
252 }
253 Err(e) => {
254 serde_json::json!({
255 "ok": false,
256 "error": format!("Failed to read log file at {}: {}", log_path.display(), e)
257 })
258 }
259 }
260}