Skip to main content

agent_first_pay/rpc/
mod.rs

1pub mod crypto;
2
3use crate::handler::{self, App};
4use crate::rpc::crypto::Cipher;
5use crate::types::*;
6use std::sync::atomic::Ordering;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tonic::Code;
10use tonic::{Request, Response, Status};
11
12pub struct RpcInit {
13    pub listen: String,
14    pub rpc_secret: Option<String>,
15    pub log: Vec<String>,
16    pub data_dir: Option<String>,
17    pub startup_argv: Vec<String>,
18    pub startup_args: serde_json::Value,
19    pub startup_requested: bool,
20}
21
22pub mod proto {
23    tonic::include_proto!("afpay");
24}
25
26use proto::af_pay_server::{AfPay, AfPayServer};
27use proto::{EncryptedRequest, EncryptedResponse};
28
29struct AfPayService {
30    cipher: Cipher,
31    config: RuntimeConfig,
32}
33
34#[tonic::async_trait]
35impl AfPay for AfPayService {
36    async fn call(
37        &self,
38        request: Request<EncryptedRequest>,
39    ) -> Result<Response<EncryptedResponse>, Status> {
40        let req = request.into_inner();
41
42        // Decrypt request
43        let plaintext = match self.cipher.decrypt(&req.nonce, &req.ciphertext) {
44            Ok(plaintext) => plaintext,
45            Err(_) => {
46                emit_rpc_request_log(
47                    &self.config,
48                    None,
49                    serde_json::json!({
50                        "input": serde_json::Value::Null,
51                        "decode_error": "decryption failed",
52                    }),
53                );
54                let status = Status::unauthenticated("decryption failed");
55                emit_rpc_response_log(&self.config, None, &[], Some(&status));
56                return Err(status);
57            }
58        };
59
60        let mut raw_input_value = serde_json::from_slice::<serde_json::Value>(&plaintext)
61            .unwrap_or(serde_json::Value::Null);
62        if let Some(object) = raw_input_value.as_object_mut() {
63            object.remove("id");
64        }
65
66        // Parse Input
67        let input: Input = match serde_json::from_slice(&plaintext) {
68            Ok(input) => input,
69            Err(e) => {
70                emit_rpc_request_log(
71                    &self.config,
72                    None,
73                    serde_json::json!({
74                        "input": raw_input_value,
75                        "decode_error": format!("invalid input: {e}"),
76                    }),
77                );
78                let status = Status::invalid_argument(format!("invalid input: {e}"));
79                emit_rpc_response_log(&self.config, None, &[], Some(&status));
80                return Err(status);
81            }
82        };
83        let request_id = input_request_id(&input).map(|s| s.to_string());
84        emit_rpc_request_log(
85            &self.config,
86            request_id.clone(),
87            serde_json::json!({
88                "input": raw_input_value,
89            }),
90        );
91
92        // Block local-only operations over RPC
93        if input.is_local_only() {
94            let status = Status::permission_denied("local-only operation");
95            emit_rpc_response_log(&self.config, request_id, &[], Some(&status));
96            return Err(status);
97        }
98
99        // Create per-request channel and App
100        let (tx, mut rx) = mpsc::channel::<Output>(256);
101        let store = crate::store::create_storage_backend(&self.config);
102        let app = Arc::new(App::new(self.config.clone(), tx, Some(true), store));
103        app.requests_total.fetch_add(1, Ordering::Relaxed);
104
105        // Dispatch
106        handler::dispatch(&app, input).await;
107
108        // Drop app to close the sender side, then collect all outputs
109        drop(app);
110        let mut outputs = Vec::new();
111        while let Some(out) = rx.recv().await {
112            // Mirror server-side log events to rpc daemon stdout so operators can
113            // observe request flow in long-running rpc mode.
114            if let Output::Log { .. } = &out {
115                let rendered = agent_first_data::cli_output(
116                    &serde_json::to_value(&out).unwrap_or(serde_json::Value::Null),
117                    agent_first_data::OutputFormat::Json,
118                );
119                println!("{rendered}");
120            }
121            let value = serde_json::to_value(&out).unwrap_or(serde_json::Value::Null);
122            outputs.push(value);
123        }
124
125        // Serialize outputs as JSON array
126        let response_json = match serde_json::to_vec(&outputs) {
127            Ok(response_json) => response_json,
128            Err(e) => {
129                let status = Status::internal(format!("serialize: {e}"));
130                emit_rpc_response_log(&self.config, request_id, &outputs, Some(&status));
131                return Err(status);
132            }
133        };
134
135        // Encrypt response
136        let (nonce, ciphertext) = match self.cipher.encrypt(&response_json) {
137            Ok(payload) => payload,
138            Err(e) => {
139                let status = Status::internal(format!("encrypt: {e}"));
140                emit_rpc_response_log(&self.config, request_id, &outputs, Some(&status));
141                return Err(status);
142            }
143        };
144
145        emit_rpc_response_log(&self.config, request_id, &outputs, None);
146
147        Ok(Response::new(EncryptedResponse { nonce, ciphertext }))
148    }
149}
150
151pub async fn run_rpc(init: RpcInit) {
152    let secret: String = match init.rpc_secret {
153        Some(s) if !s.is_empty() => s,
154        _ => {
155            let value = agent_first_data::build_cli_error(
156                "--rpc-secret is required for RPC mode",
157                Some("pass a shared secret for client authentication"),
158            );
159            let rendered =
160                agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
161            println!("{rendered}");
162            std::process::exit(1);
163        }
164    };
165
166    let cipher = Cipher::from_secret(&secret);
167
168    let resolved_dir = init
169        .data_dir
170        .unwrap_or_else(|| RuntimeConfig::default().data_dir);
171    let mut config = match RuntimeConfig::load_from_dir(&resolved_dir) {
172        Ok(c) => c,
173        Err(e) => {
174            let value = agent_first_data::build_cli_error(&e, None);
175            let rendered =
176                agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
177            println!("{rendered}");
178            std::process::exit(1);
179        }
180    };
181    if !init.log.is_empty() {
182        config.log = init.log;
183    }
184
185    // Emit startup log
186    if let Some(startup) = crate::config::maybe_startup_log(
187        &config.log,
188        init.startup_requested,
189        Some(init.startup_argv),
190        Some(&config),
191        init.startup_args,
192    ) {
193        let value = serde_json::to_value(&startup).unwrap_or(serde_json::Value::Null);
194        let rendered = agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
195        println!("{rendered}");
196    }
197
198    let startup_errors = crate::handler::startup_provider_validation_errors(&config).await;
199    for error_output in &startup_errors {
200        let value = serde_json::to_value(error_output).unwrap_or(serde_json::Value::Null);
201        let rendered = agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
202        println!("{rendered}");
203    }
204    if !startup_errors.is_empty() {
205        std::process::exit(1);
206    }
207
208    let service = AfPayService { cipher, config };
209
210    let addr = match init.listen.parse() {
211        Ok(a) => a,
212        Err(e) => {
213            let value = agent_first_data::build_cli_error(
214                &format!("invalid --rpc-listen address: {e}"),
215                Some("expected format: host:port (e.g. 127.0.0.1:9100)"),
216            );
217            let rendered =
218                agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
219            println!("{rendered}");
220            std::process::exit(1);
221        }
222    };
223
224    let server = tonic::transport::Server::builder()
225        .add_service(AfPayServer::new(service))
226        .serve(addr);
227
228    if let Err(e) = server.await {
229        let value = agent_first_data::build_cli_error(&format!("RPC server error: {e}"), None);
230        let rendered = agent_first_data::cli_output(&value, agent_first_data::OutputFormat::Json);
231        println!("{rendered}");
232        std::process::exit(1);
233    }
234}
235
236fn log_event_enabled(log_filters: &[String], event: &str) -> bool {
237    if log_filters.is_empty() {
238        return false;
239    }
240    let ev = event.to_ascii_lowercase();
241    log_filters
242        .iter()
243        .any(|f| f == "*" || f == "all" || ev.starts_with(f.as_str()))
244}
245
246fn emit_rpc_request_log(
247    config: &RuntimeConfig,
248    request_id: Option<String>,
249    args: serde_json::Value,
250) {
251    emit_rpc_log(config, "rpc_request", request_id, args);
252}
253
254fn emit_rpc_response_log(
255    config: &RuntimeConfig,
256    request_id: Option<String>,
257    outputs: &[serde_json::Value],
258    status: Option<&Status>,
259) {
260    let has_output_error = outputs
261        .iter()
262        .any(|item| item.get("code").and_then(|v| v.as_str()) == Some("error"));
263    let mut args = serde_json::json!({
264        "output_count": outputs.len(),
265        "has_error": has_output_error || status.is_some(),
266        "outputs": outputs,
267    });
268    if let Some(status) = status {
269        if let Some(object) = args.as_object_mut() {
270            object.insert(
271                "grpc_error".to_string(),
272                serde_json::json!({
273                    "code": grpc_code_name(status.code()),
274                    "message": status.message(),
275                }),
276            );
277        }
278    }
279    emit_rpc_log(config, "rpc_response", request_id, args);
280}
281
282fn emit_rpc_log(
283    config: &RuntimeConfig,
284    event: &str,
285    request_id: Option<String>,
286    args: serde_json::Value,
287) {
288    if !log_event_enabled(&config.log, event) {
289        return;
290    }
291    let log = Output::Log {
292        event: event.to_string(),
293        request_id,
294        version: None,
295        argv: None,
296        config: None,
297        args: Some(args),
298        env: None,
299        trace: Trace::from_duration(0),
300    };
301    let rendered = agent_first_data::cli_output(
302        &serde_json::to_value(&log).unwrap_or(serde_json::Value::Null),
303        agent_first_data::OutputFormat::Json,
304    );
305    println!("{rendered}");
306}
307
308fn grpc_code_name(code: Code) -> &'static str {
309    match code {
310        Code::Ok => "ok",
311        Code::Cancelled => "cancelled",
312        Code::Unknown => "unknown",
313        Code::InvalidArgument => "invalid_argument",
314        Code::DeadlineExceeded => "deadline_exceeded",
315        Code::NotFound => "not_found",
316        Code::AlreadyExists => "already_exists",
317        Code::PermissionDenied => "permission_denied",
318        Code::ResourceExhausted => "resource_exhausted",
319        Code::FailedPrecondition => "failed_precondition",
320        Code::Aborted => "aborted",
321        Code::OutOfRange => "out_of_range",
322        Code::Unimplemented => "unimplemented",
323        Code::Internal => "internal",
324        Code::Unavailable => "unavailable",
325        Code::DataLoss => "data_loss",
326        Code::Unauthenticated => "unauthenticated",
327    }
328}
329
330fn input_request_id(input: &Input) -> Option<&str> {
331    match input {
332        Input::WalletCreate { id, .. }
333        | Input::LnWalletCreate { id, .. }
334        | Input::WalletClose { id, .. }
335        | Input::WalletList { id, .. }
336        | Input::Balance { id, .. }
337        | Input::Receive { id, .. }
338        | Input::ReceiveClaim { id, .. }
339        | Input::CashuSend { id, .. }
340        | Input::CashuReceive { id, .. }
341        | Input::Send { id, .. }
342        | Input::Restore { id, .. }
343        | Input::WalletShowSeed { id, .. }
344        | Input::HistoryList { id, .. }
345        | Input::HistoryStatus { id, .. }
346        | Input::HistoryUpdate { id, .. }
347        | Input::LimitAdd { id, .. }
348        | Input::LimitRemove { id, .. }
349        | Input::LimitList { id, .. }
350        | Input::LimitSet { id, .. }
351        | Input::WalletConfigShow { id, .. }
352        | Input::WalletConfigSet { id, .. }
353        | Input::WalletConfigTokenAdd { id, .. }
354        | Input::WalletConfigTokenRemove { id, .. } => Some(id.as_str()),
355        Input::Config(_) | Input::Version | Input::Close => None,
356    }
357}