1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
use std::ffi::OsString;
use std::path::Path;
use anyhow::{Context, Result, anyhow, bail};
use interprocess::local_socket::tokio::Stream;
use interprocess::local_socket::traits::tokio::Stream as _;
use interprocess::local_socket::{GenericFilePath, ToFsName};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use crate::model::{ProcessSnapshot, RuntimePaths};
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Request {
Ping,
Ps,
Down {
timeout_seconds: Option<u64>,
},
/// Stop the listed services. Empty list = stop all.
Stop {
services: Vec<String>,
},
/// Start the listed services. Empty list = start all.
Start {
services: Vec<String>,
},
/// Restart the listed services. Empty list = restart all.
Restart {
services: Vec<String>,
},
/// Send a signal to the listed services. Empty list = all.
Kill {
services: Vec<String>,
signal: i32,
},
/// Remove processes not in the keep list.
RemoveOrphans {
keep: Vec<String>,
},
/// Re-read the daemon's config files from disk and reconcile running
/// processes against the new definition. Stops and re-spawns services
/// whose `config_hash` has changed, spawns newly-added services, and —
/// when `remove_orphans` is set — stops and drops services that have
/// been removed from the config. Without `remove_orphans`, removed
/// services are left running and logged as orphans. `force_recreate`
/// classifies every still-present service as `changed` regardless of
/// hash; `no_recreate` does the opposite, keeping hash-diverged
/// services untouched. `no_start` inserts new/changed process entries
/// but leaves them in `NotStarted` instead of `Pending` so the
/// supervisor won't auto-spawn them.
Reload {
force_recreate: bool,
no_recreate: bool,
remove_orphans: bool,
no_start: bool,
},
/// Query whether a service is currently known to the daemon and whether
/// any of its replicas is in the Running state. Used by `exec` to
/// preflight-check before spawning a one-off command in the service's
/// environment.
ServiceRunState {
name: String,
},
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Response {
Pong {
pid: u32,
instance: String,
},
Ps {
pid: u32,
instance: String,
processes: Vec<ProcessSnapshot>,
},
Ack {
message: String,
},
/// Reply to `ServiceRunState`. `known` is whether the service is in the
/// daemon's process map at all; `any_running` is whether any replica is
/// currently in the Running state.
ServiceRunState {
known: bool,
any_running: bool,
},
Error {
message: String,
},
}
/// Default timeout for a single IPC round-trip. Local sockets are fast; if
/// the daemon hasn't responded in a few seconds it's almost certainly hung.
/// Override via `DECOMPOSE_IPC_TIMEOUT_MS` (default 5000ms) — see
/// [`crate::tuning`].
pub async fn send_request(paths: &RuntimePaths, request: Request) -> Result<Response> {
tokio::time::timeout(
crate::tuning::ipc_timeout(),
send_request_inner(paths, request),
)
.await
.context("IPC request timed out — daemon may be unresponsive")?
}
async fn send_request_inner(paths: &RuntimePaths, request: Request) -> Result<Response> {
let socket_name = to_socket_name(&paths.socket)?;
let stream = Stream::connect(socket_name)
.await
.with_context(|| format!("failed to connect to {}", paths.socket.display()))?;
let (read_half, mut write_half) = tokio::io::split(stream);
let payload = serde_json::to_string(&request)?;
write_half.write_all(payload.as_bytes()).await?;
write_half.write_all(b"\n").await?;
write_half.flush().await?;
let mut reader = BufReader::new(read_half);
let mut line = String::new();
let n = reader.read_line(&mut line).await?;
if n == 0 {
bail!("daemon closed the connection");
}
let response: Response = serde_json::from_str(line.trim())?;
Ok(response)
}
pub fn to_socket_name(path: &Path) -> Result<interprocess::local_socket::Name<'static>> {
let raw: OsString = path.as_os_str().to_os_string();
let utf = raw
.into_string()
.map_err(|_| anyhow!("socket path contains invalid UTF-8: {}", path.display()))?;
utf.to_fs_name::<GenericFilePath>()
.context("failed to create local socket name")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_rejects_plain_garbage() {
// The daemon feeds each line from the wire to `serde_json::from_str`
// and surfaces failures as `invalid request json`. A random
// non-JSON blob must not silently parse into one of the Request
// variants.
let err = serde_json::from_str::<Request>("not json at all").unwrap_err();
assert!(
err.to_string().to_lowercase().contains("expected"),
"unexpected error: {err}"
);
}
#[test]
fn request_rejects_unknown_variant() {
// Adjacently-tagged enum with `#[serde(tag = "type")]`: a message
// with a known-looking shape but an unknown `type` tag must fail
// parsing rather than defaulting to Ping or similar.
let err = serde_json::from_str::<Request>(r#"{"type":"no_such_command"}"#).unwrap_err();
assert!(
err.to_string().contains("unknown variant"),
"unexpected error: {err}"
);
}
#[test]
fn request_rejects_missing_required_fields() {
// `Kill` has required `services` and `signal` fields. Dropping the
// signal must surface a parse error — not fall back to 0, which
// would silently send signal 0 (no-op) to every process.
let err = serde_json::from_str::<Request>(r#"{"type":"kill","services":[]}"#).unwrap_err();
assert!(
err.to_string().contains("signal"),
"unexpected error: {err}"
);
}
#[test]
fn request_rejects_wrong_field_types() {
// `Down.timeout_seconds` is `Option<u64>`. Passing a negative
// number (or a string) must fail rather than coercing.
let err =
serde_json::from_str::<Request>(r#"{"type":"down","timeout_seconds":-5}"#).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("invalid") || msg.contains("out of range") || msg.contains("negative"),
"unexpected error: {msg}"
);
}
#[test]
fn request_round_trips_through_json() {
// Sanity: the wire encoding matches what the daemon reads. Helps
// catch accidental rename_all / tag drift.
let req = Request::Kill {
services: vec!["api".to_string()],
signal: 15,
};
let encoded = serde_json::to_string(&req).unwrap();
let decoded: Request = serde_json::from_str(&encoded).unwrap();
match decoded {
Request::Kill { services, signal } => {
assert_eq!(services, vec!["api".to_string()]);
assert_eq!(signal, 15);
}
other => panic!("wrong variant round-tripped: {other:?}"),
}
}
}