Skip to main content

palladium_cli/commands/
msg.rs

1use clap::{Args, Subcommand};
2use schemars::JsonSchema;
3use serde::Serialize;
4use serde_json::json;
5
6use crate::client::{CliError, ControlPlaneClient, Endpoint};
7use crate::output;
8use crate::CliResult;
9
10#[derive(Subcommand, Debug, Serialize, JsonSchema)]
11#[serde(rename_all = "kebab-case")]
12pub enum MsgCommand {
13    /// Send a JSON message to an actor (Phase 5 — codec registry required).
14    Send(MsgSendArgs),
15    /// Perform a synchronous request-response call.
16    Call(MsgCallArgs),
17    /// Inspect an actor's current mailbox depth and capacity.
18    Inspect(MsgInspectArgs),
19}
20
21#[derive(Args, Debug, Serialize, JsonSchema)]
22#[serde(rename_all = "kebab-case")]
23pub struct MsgSendArgs {
24    /// Target actor path.
25    pub path: String,
26    /// JSON payload to send.
27    pub payload: String,
28}
29
30#[derive(Args, Debug, Serialize, JsonSchema)]
31#[serde(rename_all = "kebab-case")]
32pub struct MsgCallArgs {
33    /// Target actor path.
34    pub path: String,
35    /// JSON payload to send.
36    pub payload: String,
37    /// Timeout in seconds.
38    #[arg(long, default_value_t = 5)]
39    pub timeout: u64,
40}
41
42#[derive(Args, Debug, Serialize, JsonSchema)]
43#[serde(rename_all = "kebab-case")]
44pub struct MsgInspectArgs {
45    /// Target actor path.
46    pub path: String,
47    /// Output in JSON format.
48    #[arg(long)]
49    pub json: bool,
50}
51
52pub fn run(cmd: MsgCommand, endpoint: &Endpoint) -> CliResult {
53    match cmd {
54        MsgCommand::Send(_args) => {
55            eprintln!("pd msg send: actor.send not implemented (Phase 5)");
56            std::process::exit(1);
57        }
58        MsgCommand::Call(args) => {
59            let mut client = ControlPlaneClient::connect_endpoint(endpoint)?;
60            let params = json!({
61                "path": args.path,
62                "payload": args.payload,
63                "timeout_secs": args.timeout
64            });
65            let result = match client.call("msg.call", params.clone()) {
66                Ok(result) => result,
67                Err(CliError::Rpc { code, message })
68                    if code == -32000 && message.starts_with("actor not found:") =>
69                {
70                    call_remote_via_federation(endpoint, &params)?
71                }
72                Err(e) => return Err(e.into()),
73            };
74            if let Some(text) = result.get("payload_text").and_then(|v| v.as_str()) {
75                println!("{text}");
76            } else {
77                println!("{}", serde_json::to_string_pretty(&result)?);
78            }
79        }
80        MsgCommand::Inspect(args) => {
81            let mut client = ControlPlaneClient::connect_endpoint(endpoint)?;
82            let result = client.call("actor.info", json!({"path": args.path}))?;
83            if args.json {
84                println!("{}", serde_json::to_string_pretty(&result)?);
85            } else {
86                print!("{}", output::format_mailbox_info(&result));
87            }
88        }
89    }
90    Ok(())
91}
92
93fn call_remote_via_federation(endpoint: &Endpoint, params: &serde_json::Value) -> CliResultValue {
94    let path = params
95        .get("path")
96        .and_then(|v| v.as_str())
97        .ok_or_else(|| CliError::Protocol("missing path".to_string()))?;
98
99    let mut local = ControlPlaneClient::connect_endpoint(endpoint)?;
100    let policy = local.call("federation.policy.get", serde_json::Value::Null)?;
101    if !federation_policy_allows(&policy, path) {
102        return Err(CliError::Rpc {
103            code: -32000,
104            message: format!("federation policy denies actor path: {path}"),
105        }
106        .into());
107    }
108
109    let entry = local.call("federation.registry.get", json!({ "path": path }))?;
110    let owner_engine = entry
111        .get("engine_id")
112        .and_then(|v| v.as_str())
113        .ok_or_else(|| CliError::Protocol("federation.registry.get missing engine_id".into()))?;
114    let status = local.call("cluster.status", serde_json::Value::Null)?;
115    if status
116        .get("local_engine")
117        .and_then(|v| v.as_str())
118        .is_some_and(|s| s == owner_engine)
119    {
120        return Err(CliError::Rpc {
121            code: -32000,
122            message: format!("actor not found locally: {path}"),
123        }
124        .into());
125    }
126
127    let members = local.call("cluster.members", serde_json::Value::Null)?;
128    let owner_addr = members
129        .as_array()
130        .and_then(|items| {
131            items.iter().find_map(|m| {
132                if m.get("engine_id").and_then(|v| v.as_str()) == Some(owner_engine) {
133                    m.get("addr").and_then(|v| v.as_str()).map(str::to_string)
134                } else {
135                    None
136                }
137            })
138        })
139        .ok_or_else(|| CliError::Protocol(format!("owner member not found: {owner_engine}")))?;
140
141    let remote = remote_endpoint_for_member(endpoint, &owner_addr)?;
142    let mut remote_client = ControlPlaneClient::connect_endpoint(&remote)?;
143    Ok(remote_client.call("msg.call", params.clone())?)
144}
145
146type CliResultValue = Result<serde_json::Value, Box<dyn std::error::Error>>;
147
148fn remote_endpoint_for_member(current: &Endpoint, member_addr: &str) -> Result<Endpoint, CliError> {
149    let (host, port) = member_addr
150        .rsplit_once(':')
151        .ok_or_else(|| CliError::Protocol(format!("invalid member addr: {member_addr}")))?;
152    let port = port
153        .parse::<u16>()
154        .map_err(|_| CliError::Protocol(format!("invalid member port: {member_addr}")))?;
155    match current {
156        Endpoint::Tcp { tls, .. } => Ok(Endpoint::Tcp {
157            host: host.to_string(),
158            port,
159            tls: tls.clone(),
160        }),
161        Endpoint::Quic { tls, .. } => Ok(Endpoint::Quic {
162            host: host.to_string(),
163            port,
164            tls: tls.clone(),
165        }),
166        Endpoint::Unix(_) => Err(CliError::Protocol(
167            "remote msg.call forwarding requires --endpoint tcp://... or quic://...".to_string(),
168        )),
169    }
170}
171
172fn federation_policy_allows(policy: &serde_json::Value, path: &str) -> bool {
173    let mode = policy
174        .get("mode")
175        .and_then(|v| v.as_str())
176        .unwrap_or("allow-all");
177    let prefixes: Vec<&str> = policy
178        .get("prefixes")
179        .and_then(|v| v.as_array())
180        .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
181        .unwrap_or_default();
182
183    let matches_prefix = |p: &str| path == p || path.starts_with(&format!("{p}/"));
184    match mode {
185        "allow-all" => true,
186        "deny-all" => false,
187        "allow-prefixes" => prefixes.iter().any(|p| matches_prefix(p)),
188        "deny-prefixes" => !prefixes.iter().any(|p| matches_prefix(p)),
189        _ => true,
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::{federation_policy_allows, remote_endpoint_for_member};
196    use crate::client::{Endpoint, TlsClientConfig};
197    use serde_json::json;
198    use std::path::PathBuf;
199
200    fn sample_tls() -> TlsClientConfig {
201        TlsClientConfig {
202            cert: PathBuf::from("client.crt"),
203            key: PathBuf::from("client.key"),
204            ca: Some(PathBuf::from("ca.crt")),
205            sni: None,
206        }
207    }
208
209    #[test]
210    fn endpoint_for_member_keeps_scheme_and_tls() {
211        let endpoint = Endpoint::Tcp {
212            host: "127.0.0.1".to_string(),
213            port: 4100,
214            tls: sample_tls(),
215        };
216        let remote = remote_endpoint_for_member(&endpoint, "server-b:4200").unwrap();
217        match remote {
218            Endpoint::Tcp { host, port, .. } => {
219                assert_eq!(host, "server-b");
220                assert_eq!(port, 4200);
221            }
222            _ => panic!("expected tcp endpoint"),
223        }
224    }
225
226    #[test]
227    fn allow_prefix_policy_matches_descendants() {
228        let policy = json!({
229            "mode": "allow-prefixes",
230            "prefixes": ["/user"]
231        });
232        assert!(federation_policy_allows(&policy, "/user/core1/counter-a"));
233        assert!(!federation_policy_allows(&policy, "/system/control-plane"));
234    }
235}