1use clap::{Args, Subcommand};
2use schemars::JsonSchema;
3use serde::Serialize;
4use serde_json::json;
5
6use crate::client::{CliError, ControlPlaneClient, Endpoint};
7use crate::output;
8use crate::CliResult;
9
10#[derive(Subcommand, Debug, Serialize, JsonSchema)]
11#[serde(rename_all = "kebab-case")]
12pub enum MsgCommand {
13 Send(MsgSendArgs),
15 Call(MsgCallArgs),
17 Inspect(MsgInspectArgs),
19}
20
21#[derive(Args, Debug, Serialize, JsonSchema)]
22#[serde(rename_all = "kebab-case")]
23pub struct MsgSendArgs {
24 pub path: String,
26 pub payload: String,
28}
29
30#[derive(Args, Debug, Serialize, JsonSchema)]
31#[serde(rename_all = "kebab-case")]
32pub struct MsgCallArgs {
33 pub path: String,
35 pub payload: String,
37 #[arg(long, default_value_t = 5)]
39 pub timeout: u64,
40}
41
42#[derive(Args, Debug, Serialize, JsonSchema)]
43#[serde(rename_all = "kebab-case")]
44pub struct MsgInspectArgs {
45 pub path: String,
47 #[arg(long)]
49 pub json: bool,
50}
51
52pub fn run(cmd: MsgCommand, endpoint: &Endpoint) -> CliResult {
53 match cmd {
54 MsgCommand::Send(_args) => {
55 eprintln!("pd msg send: actor.send not implemented (Phase 5)");
56 std::process::exit(1);
57 }
58 MsgCommand::Call(args) => {
59 let mut client = ControlPlaneClient::connect_endpoint(endpoint)?;
60 let params = json!({
61 "path": args.path,
62 "payload": args.payload,
63 "timeout_secs": args.timeout
64 });
65 let result = match client.call("msg.call", params.clone()) {
66 Ok(result) => result,
67 Err(CliError::Rpc { code, message })
68 if code == -32000 && message.starts_with("actor not found:") =>
69 {
70 call_remote_via_federation(endpoint, ¶ms)?
71 }
72 Err(e) => return Err(e.into()),
73 };
74 if let Some(text) = result.get("payload_text").and_then(|v| v.as_str()) {
75 println!("{text}");
76 } else {
77 println!("{}", serde_json::to_string_pretty(&result)?);
78 }
79 }
80 MsgCommand::Inspect(args) => {
81 let mut client = ControlPlaneClient::connect_endpoint(endpoint)?;
82 let result = client.call("actor.info", json!({"path": args.path}))?;
83 if args.json {
84 println!("{}", serde_json::to_string_pretty(&result)?);
85 } else {
86 print!("{}", output::format_mailbox_info(&result));
87 }
88 }
89 }
90 Ok(())
91}
92
93fn call_remote_via_federation(endpoint: &Endpoint, params: &serde_json::Value) -> CliResultValue {
94 let path = params
95 .get("path")
96 .and_then(|v| v.as_str())
97 .ok_or_else(|| CliError::Protocol("missing path".to_string()))?;
98
99 let mut local = ControlPlaneClient::connect_endpoint(endpoint)?;
100 let policy = local.call("federation.policy.get", serde_json::Value::Null)?;
101 if !federation_policy_allows(&policy, path) {
102 return Err(CliError::Rpc {
103 code: -32000,
104 message: format!("federation policy denies actor path: {path}"),
105 }
106 .into());
107 }
108
109 let entry = local.call("federation.registry.get", json!({ "path": path }))?;
110 let owner_engine = entry
111 .get("engine_id")
112 .and_then(|v| v.as_str())
113 .ok_or_else(|| CliError::Protocol("federation.registry.get missing engine_id".into()))?;
114 let status = local.call("cluster.status", serde_json::Value::Null)?;
115 if status
116 .get("local_engine")
117 .and_then(|v| v.as_str())
118 .is_some_and(|s| s == owner_engine)
119 {
120 return Err(CliError::Rpc {
121 code: -32000,
122 message: format!("actor not found locally: {path}"),
123 }
124 .into());
125 }
126
127 let members = local.call("cluster.members", serde_json::Value::Null)?;
128 let owner_addr = members
129 .as_array()
130 .and_then(|items| {
131 items.iter().find_map(|m| {
132 if m.get("engine_id").and_then(|v| v.as_str()) == Some(owner_engine) {
133 m.get("addr").and_then(|v| v.as_str()).map(str::to_string)
134 } else {
135 None
136 }
137 })
138 })
139 .ok_or_else(|| CliError::Protocol(format!("owner member not found: {owner_engine}")))?;
140
141 let remote = remote_endpoint_for_member(endpoint, &owner_addr)?;
142 let mut remote_client = ControlPlaneClient::connect_endpoint(&remote)?;
143 Ok(remote_client.call("msg.call", params.clone())?)
144}
145
146type CliResultValue = Result<serde_json::Value, Box<dyn std::error::Error>>;
147
148fn remote_endpoint_for_member(current: &Endpoint, member_addr: &str) -> Result<Endpoint, CliError> {
149 let (host, port) = member_addr
150 .rsplit_once(':')
151 .ok_or_else(|| CliError::Protocol(format!("invalid member addr: {member_addr}")))?;
152 let port = port
153 .parse::<u16>()
154 .map_err(|_| CliError::Protocol(format!("invalid member port: {member_addr}")))?;
155 match current {
156 Endpoint::Tcp { tls, .. } => Ok(Endpoint::Tcp {
157 host: host.to_string(),
158 port,
159 tls: tls.clone(),
160 }),
161 Endpoint::Quic { tls, .. } => Ok(Endpoint::Quic {
162 host: host.to_string(),
163 port,
164 tls: tls.clone(),
165 }),
166 Endpoint::Unix(_) => Err(CliError::Protocol(
167 "remote msg.call forwarding requires --endpoint tcp://... or quic://...".to_string(),
168 )),
169 }
170}
171
172fn federation_policy_allows(policy: &serde_json::Value, path: &str) -> bool {
173 let mode = policy
174 .get("mode")
175 .and_then(|v| v.as_str())
176 .unwrap_or("allow-all");
177 let prefixes: Vec<&str> = policy
178 .get("prefixes")
179 .and_then(|v| v.as_array())
180 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
181 .unwrap_or_default();
182
183 let matches_prefix = |p: &str| path == p || path.starts_with(&format!("{p}/"));
184 match mode {
185 "allow-all" => true,
186 "deny-all" => false,
187 "allow-prefixes" => prefixes.iter().any(|p| matches_prefix(p)),
188 "deny-prefixes" => !prefixes.iter().any(|p| matches_prefix(p)),
189 _ => true,
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::{federation_policy_allows, remote_endpoint_for_member};
196 use crate::client::{Endpoint, TlsClientConfig};
197 use serde_json::json;
198 use std::path::PathBuf;
199
200 fn sample_tls() -> TlsClientConfig {
201 TlsClientConfig {
202 cert: PathBuf::from("client.crt"),
203 key: PathBuf::from("client.key"),
204 ca: Some(PathBuf::from("ca.crt")),
205 sni: None,
206 }
207 }
208
209 #[test]
210 fn endpoint_for_member_keeps_scheme_and_tls() {
211 let endpoint = Endpoint::Tcp {
212 host: "127.0.0.1".to_string(),
213 port: 4100,
214 tls: sample_tls(),
215 };
216 let remote = remote_endpoint_for_member(&endpoint, "server-b:4200").unwrap();
217 match remote {
218 Endpoint::Tcp { host, port, .. } => {
219 assert_eq!(host, "server-b");
220 assert_eq!(port, 4200);
221 }
222 _ => panic!("expected tcp endpoint"),
223 }
224 }
225
226 #[test]
227 fn allow_prefix_policy_matches_descendants() {
228 let policy = json!({
229 "mode": "allow-prefixes",
230 "prefixes": ["/user"]
231 });
232 assert!(federation_policy_allows(&policy, "/user/core1/counter-a"));
233 assert!(!federation_policy_allows(&policy, "/system/control-plane"));
234 }
235}