1pub mod crypto;
2
3use crate::handler::{self, App};
4use crate::rpc::crypto::Cipher;
5use crate::types::*;
6use std::sync::atomic::Ordering;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tonic::Code;
10use tonic::{Request, Response, Status};
11
12pub struct RpcInit {
13 pub listen: String,
14 pub rpc_secret: Option<String>,
15 pub log: Vec<String>,
16 pub data_dir: Option<String>,
17 pub startup_argv: Vec<String>,
18 pub startup_args: serde_json::Value,
19 pub startup_requested: bool,
20}
21
22pub mod proto {
23 tonic::include_proto!("afpay");
24}
25
26use proto::af_pay_server::{AfPay, AfPayServer};
27use proto::{EncryptedRequest, EncryptedResponse};
28
29struct AfPayService {
30 cipher: Cipher,
31 config: RuntimeConfig,
32}
33
34#[tonic::async_trait]
35impl AfPay for AfPayService {
36 async fn call(
37 &self,
38 request: Request<EncryptedRequest>,
39 ) -> Result<Response<EncryptedResponse>, Status> {
40 let req = request.into_inner();
41
42 let plaintext = match self.cipher.decrypt(&req.nonce, &req.ciphertext) {
44 Ok(plaintext) => plaintext,
45 Err(_) => {
46 emit_rpc_request_log(
47 &self.config,
48 None,
49 serde_json::json!({
50 "input": serde_json::Value::Null,
51 "decode_error": "decryption failed",
52 }),
53 );
54 let status = Status::unauthenticated("decryption failed");
55 emit_rpc_response_log(&self.config, None, &[], Some(&status));
56 return Err(status);
57 }
58 };
59
60 let mut raw_input_value = serde_json::from_slice::<serde_json::Value>(&plaintext)
61 .unwrap_or(serde_json::Value::Null);
62 if let Some(object) = raw_input_value.as_object_mut() {
63 object.remove("id");
64 }
65
66 let input: Input = match serde_json::from_slice(&plaintext) {
68 Ok(input) => input,
69 Err(e) => {
70 emit_rpc_request_log(
71 &self.config,
72 None,
73 serde_json::json!({
74 "input": raw_input_value,
75 "decode_error": format!("invalid input: {e}"),
76 }),
77 );
78 let status = Status::invalid_argument(format!("invalid input: {e}"));
79 emit_rpc_response_log(&self.config, None, &[], Some(&status));
80 return Err(status);
81 }
82 };
83 let request_id = input_request_id(&input).map(|s| s.to_string());
84 emit_rpc_request_log(
85 &self.config,
86 request_id.clone(),
87 serde_json::json!({
88 "input": raw_input_value,
89 }),
90 );
91
92 if input.is_local_only() {
94 let status = Status::permission_denied("local-only operation");
95 emit_rpc_response_log(&self.config, request_id, &[], Some(&status));
96 return Err(status);
97 }
98
99 let (tx, mut rx) = mpsc::channel::<Output>(256);
101 let store = crate::store::create_storage_backend(&self.config);
102 let app = Arc::new(App::new(self.config.clone(), tx, Some(true), store));
103 app.requests_total.fetch_add(1, Ordering::Relaxed);
104
105 handler::dispatch(&app, input).await;
107
108 drop(app);
110 let mut outputs = Vec::new();
111 while let Some(out) = rx.recv().await {
112 if let Output::Log { .. } = &out {
115 let rendered = agent_first_data::cli_output(
116 &serde_json::to_value(&out).unwrap_or(serde_json::Value::Null),
117 agent_first_data::OutputFormat::Json,
118 );
119 println!("{rendered}");
120 }
121 let value = serde_json::to_value(&out).unwrap_or(serde_json::Value::Null);
122 outputs.push(value);
123 }
124
125 let response_json = match serde_json::to_vec(&outputs) {
127 Ok(response_json) => response_json,
128 Err(e) => {
129 let status = Status::internal(format!("serialize: {e}"));
130 emit_rpc_response_log(&self.config, request_id, &outputs, Some(&status));
131 return Err(status);
132 }
133 };
134
135 let (nonce, ciphertext) = match self.cipher.encrypt(&response_json) {
137 Ok(payload) => payload,
138 Err(e) => {
139 let status = Status::internal(format!("encrypt: {e}"));
140 emit_rpc_response_log(&self.config, request_id, &outputs, Some(&status));
141 return Err(status);
142 }
143 };
144
145 emit_rpc_response_log(&self.config, request_id, &outputs, None);
146
147 Ok(Response::new(EncryptedResponse { nonce, ciphertext }))
148 }
149}
150
151pub async fn run_rpc(init: RpcInit) {
152 let secret: String = match init.rpc_secret {
153 Some(s) if !s.is_empty() => s,
154 _ => {
155 let value = agent_first_data::build_cli_error(
156 "--rpc-secret is required for RPC mode",
157 Some("pass a shared secret for client authentication"),
158 );
159 let rendered =
160 agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
161 println!("{rendered}");
162 std::process::exit(1);
163 }
164 };
165
166 let cipher = Cipher::from_secret(&secret);
167
168 let resolved_dir = init
169 .data_dir
170 .unwrap_or_else(|| RuntimeConfig::default().data_dir);
171 let mut config = match RuntimeConfig::load_from_dir(&resolved_dir) {
172 Ok(c) => c,
173 Err(e) => {
174 let value = agent_first_data::build_cli_error(&e, None);
175 let rendered =
176 agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
177 println!("{rendered}");
178 std::process::exit(1);
179 }
180 };
181 if !init.log.is_empty() {
182 config.log = init.log;
183 }
184
185 if let Some(startup) = crate::config::maybe_startup_log(
187 &config.log,
188 init.startup_requested,
189 Some(init.startup_argv),
190 Some(&config),
191 init.startup_args,
192 ) {
193 let value = serde_json::to_value(&startup).unwrap_or(serde_json::Value::Null);
194 let rendered = agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
195 println!("{rendered}");
196 }
197
198 let startup_errors = crate::handler::startup_provider_validation_errors(&config).await;
199 for error_output in &startup_errors {
200 let value = serde_json::to_value(error_output).unwrap_or(serde_json::Value::Null);
201 let rendered = agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
202 println!("{rendered}");
203 }
204 if !startup_errors.is_empty() {
205 std::process::exit(1);
206 }
207
208 let service = AfPayService { cipher, config };
209
210 let addr = match init.listen.parse() {
211 Ok(a) => a,
212 Err(e) => {
213 let value = agent_first_data::build_cli_error(
214 &format!("invalid --rpc-listen address: {e}"),
215 Some("expected format: host:port (e.g. 127.0.0.1:9100)"),
216 );
217 let rendered =
218 agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
219 println!("{rendered}");
220 std::process::exit(1);
221 }
222 };
223
224 let server = tonic::transport::Server::builder()
225 .add_service(AfPayServer::new(service))
226 .serve(addr);
227
228 if let Err(e) = server.await {
229 let value = agent_first_data::build_cli_error(&format!("RPC server error: {e}"), None);
230 let rendered = agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
231 println!("{rendered}");
232 std::process::exit(1);
233 }
234}
235
236fn log_event_enabled(log_filters: &[String], event: &str) -> bool {
237 if log_filters.is_empty() {
238 return false;
239 }
240 let ev = event.to_ascii_lowercase();
241 log_filters
242 .iter()
243 .any(|f| f == "*" || f == "all" || ev.starts_with(f.as_str()))
244}
245
246fn emit_rpc_request_log(
247 config: &RuntimeConfig,
248 request_id: Option<String>,
249 args: serde_json::Value,
250) {
251 emit_rpc_log(config, "rpc_request", request_id, args);
252}
253
254fn emit_rpc_response_log(
255 config: &RuntimeConfig,
256 request_id: Option<String>,
257 outputs: &[serde_json::Value],
258 status: Option<&Status>,
259) {
260 let has_output_error = outputs
261 .iter()
262 .any(|item| item.get("code").and_then(|v| v.as_str()) == Some("error"));
263 let mut args = serde_json::json!({
264 "output_count": outputs.len(),
265 "has_error": has_output_error || status.is_some(),
266 "outputs": outputs,
267 });
268 if let Some(status) = status {
269 if let Some(object) = args.as_object_mut() {
270 object.insert(
271 "grpc_error".to_string(),
272 serde_json::json!({
273 "code": grpc_code_name(status.code()),
274 "message": status.message(),
275 }),
276 );
277 }
278 }
279 emit_rpc_log(config, "rpc_response", request_id, args);
280}
281
282fn emit_rpc_log(
283 config: &RuntimeConfig,
284 event: &str,
285 request_id: Option<String>,
286 args: serde_json::Value,
287) {
288 if !log_event_enabled(&config.log, event) {
289 return;
290 }
291 let log = Output::Log {
292 event: event.to_string(),
293 request_id,
294 version: None,
295 argv: None,
296 config: None,
297 args: Some(args),
298 env: None,
299 trace: Trace::from_duration(0),
300 };
301 let rendered = agent_first_data::cli_output(
302 &serde_json::to_value(&log).unwrap_or(serde_json::Value::Null),
303 agent_first_data::OutputFormat::Json,
304 );
305 println!("{rendered}");
306}
307
308fn grpc_code_name(code: Code) -> &'static str {
309 match code {
310 Code::Ok => "ok",
311 Code::Cancelled => "cancelled",
312 Code::Unknown => "unknown",
313 Code::InvalidArgument => "invalid_argument",
314 Code::DeadlineExceeded => "deadline_exceeded",
315 Code::NotFound => "not_found",
316 Code::AlreadyExists => "already_exists",
317 Code::PermissionDenied => "permission_denied",
318 Code::ResourceExhausted => "resource_exhausted",
319 Code::FailedPrecondition => "failed_precondition",
320 Code::Aborted => "aborted",
321 Code::OutOfRange => "out_of_range",
322 Code::Unimplemented => "unimplemented",
323 Code::Internal => "internal",
324 Code::Unavailable => "unavailable",
325 Code::DataLoss => "data_loss",
326 Code::Unauthenticated => "unauthenticated",
327 }
328}
329
330fn input_request_id(input: &Input) -> Option<&str> {
331 match input {
332 Input::WalletCreate { id, .. }
333 | Input::LnWalletCreate { id, .. }
334 | Input::WalletClose { id, .. }
335 | Input::WalletList { id, .. }
336 | Input::Balance { id, .. }
337 | Input::Receive { id, .. }
338 | Input::ReceiveClaim { id, .. }
339 | Input::CashuSend { id, .. }
340 | Input::CashuReceive { id, .. }
341 | Input::Send { id, .. }
342 | Input::Restore { id, .. }
343 | Input::WalletShowSeed { id, .. }
344 | Input::HistoryList { id, .. }
345 | Input::HistoryStatus { id, .. }
346 | Input::HistoryUpdate { id, .. }
347 | Input::LimitAdd { id, .. }
348 | Input::LimitRemove { id, .. }
349 | Input::LimitList { id, .. }
350 | Input::LimitSet { id, .. }
351 | Input::WalletConfigShow { id, .. }
352 | Input::WalletConfigSet { id, .. }
353 | Input::WalletConfigTokenAdd { id, .. }
354 | Input::WalletConfigTokenRemove { id, .. } => Some(id.as_str()),
355 Input::Config(_) | Input::Version | Input::Close => None,
356 }
357}