Skip to main content

agent_first_pay/mode/
mod.rs

1use crate::args::Mode;
2use agent_first_data::OutputFormat;
3
4#[cfg(feature = "interactive")]
5use crate::args::{InteractiveFrontend, InteractiveInit};
6#[cfg(feature = "interactive")]
7use crate::config::VERSION;
8#[cfg(feature = "interactive")]
9use crate::handler::{self, App};
10#[cfg(all(feature = "interactive", feature = "rpc"))]
11use crate::provider::remote;
12#[cfg(feature = "interactive")]
13use crate::types::*;
14#[cfg(feature = "interactive")]
15use std::io::Write as _;
16#[cfg(feature = "interactive")]
17use std::sync::Arc;
18#[cfg(feature = "interactive")]
19use tokio::sync::mpsc;
20
21mod cli;
22#[cfg(feature = "backup")]
23mod data;
24#[cfg(feature = "interactive")]
25mod interactive;
26mod pipe;
27#[cfg(feature = "rest")]
28pub mod rest;
29#[cfg(feature = "rpc")]
30pub mod rpc;
31#[cfg(feature = "interactive")]
32mod session;
33#[cfg(feature = "interactive")]
34mod tui;
35
36#[cfg(feature = "interactive")]
37use session::{
38    banner_hint, mode_name, render_output, CommandCompleter, SessionBackend, SessionState,
39    OUTPUT_CHANNEL_CAPACITY,
40};
41
42#[cfg(feature = "interactive")]
43struct InteractiveSessionRuntime {
44    frontend: InteractiveFrontend,
45    state: SessionState,
46    backend: SessionBackend,
47    completer: CommandCompleter,
48    history_path: String,
49    intro_messages: Vec<String>,
50}
51
52pub async fn run(mode: Mode) {
53    match mode {
54        Mode::Cli(req) => {
55            if req.rpc_endpoint.is_some() {
56                #[cfg(feature = "rpc")]
57                {
58                    cli::run_remote(*req).await;
59                }
60                #[cfg(not(feature = "rpc"))]
61                {
62                    cli::emit_cli_error(
63                        "--rpc-endpoint requires feature 'rpc'; rebuild with: cargo build --features rpc",
64                        req.output,
65                    );
66                    std::process::exit(1);
67                }
68            } else {
69                cli::run(*req).await;
70            }
71        }
72        Mode::Pipe(init) => pipe::run(init).await,
73        Mode::Interactive(_init) => {
74            #[cfg(feature = "interactive")]
75            {
76                run_interactive(_init).await;
77            }
78            #[cfg(not(feature = "interactive"))]
79            {
80                cli::emit_cli_error(
81                    "interactive and tui modes require feature 'interactive'; rebuild with: cargo build --features interactive",
82                    OutputFormat::Json,
83                );
84                std::process::exit(1);
85            }
86        }
87        Mode::Rpc(_init) => {
88            #[cfg(feature = "rpc")]
89            {
90                rpc::run_rpc(_init).await;
91            }
92            #[cfg(not(feature = "rpc"))]
93            {
94                cli::emit_cli_error(
95                    "rpc mode requires feature 'rpc'; rebuild with: cargo build --features rpc",
96                    OutputFormat::Json,
97                );
98                std::process::exit(1);
99            }
100        }
101        #[cfg(feature = "rest")]
102        Mode::Rest(init) => rest::run_rest(init).await,
103        Mode::Data(_op) => {
104            #[cfg(feature = "backup")]
105            {
106                data::run_data(_op).await;
107            }
108            #[cfg(not(feature = "backup"))]
109            {
110                cli::emit_cli_error(
111                    "backup/restore requires feature 'backup'; rebuild with: cargo build --features backup",
112                    OutputFormat::Json,
113                );
114                std::process::exit(1);
115            }
116        }
117    }
118}
119
120#[cfg(feature = "interactive")]
121async fn run_interactive(init: InteractiveInit) {
122    let InteractiveInit {
123        frontend,
124        output,
125        log,
126        data_dir,
127        rpc_endpoint,
128        rpc_secret,
129    } = init;
130
131    let runtime = if let Some(endpoint) = rpc_endpoint {
132        #[cfg(feature = "rpc")]
133        {
134            bootstrap_remote_session(
135                frontend,
136                output,
137                &log,
138                data_dir.as_deref(),
139                &endpoint,
140                rpc_secret.as_deref(),
141            )
142            .await
143        }
144        #[cfg(not(feature = "rpc"))]
145        {
146            let _ = (endpoint, rpc_secret);
147            cli::emit_cli_error(
148                "--rpc-endpoint requires feature 'rpc'; rebuild with: cargo build --features rpc",
149                output,
150            );
151            return;
152        }
153    } else {
154        bootstrap_local_session(frontend, output, &log, data_dir).await
155    };
156
157    let Some(runtime) = runtime else {
158        return;
159    };
160
161    match frontend {
162        InteractiveFrontend::Interactive => interactive::run_interactive_ui(runtime).await,
163        InteractiveFrontend::Tui => tui::run_tui_ui(runtime).await,
164    }
165}
166
167#[cfg(feature = "interactive")]
168async fn bootstrap_local_session(
169    frontend: InteractiveFrontend,
170    output: OutputFormat,
171    log: &[String],
172    data_dir: Option<String>,
173) -> Option<InteractiveSessionRuntime> {
174    let resolved_dir = data_dir.unwrap_or_else(|| RuntimeConfig::default().data_dir);
175    let mut config = match RuntimeConfig::load_from_dir(&resolved_dir) {
176        Ok(config) => config,
177        Err(error) => {
178            let _ = writeln!(std::io::stdout(), "config error: {error}");
179            return None;
180        }
181    };
182
183    let data_dir_owned = config.data_dir.clone();
184    let log_filters = agent_first_data::cli_parse_log_filters(log);
185    config.log = log_filters.clone();
186
187    let mut intro_messages = Vec::new();
188    if let Some(startup) = crate::config::maybe_startup_log(
189        &log_filters,
190        false,
191        None,
192        Some(&config),
193        serde_json::json!({
194            "mode": mode_name(frontend),
195            "backend": "local",
196            "data_dir": config.data_dir,
197        }),
198    ) {
199        intro_messages.push(render_output(&startup, output));
200    }
201
202    let startup_errors = handler::startup_provider_validation_errors(&config).await;
203    for error_output in &startup_errors {
204        intro_messages.push(render_output(error_output, output));
205    }
206    if !startup_errors.is_empty() {
207        for message in intro_messages {
208            let _ = writeln!(std::io::stdout(), "{message}");
209        }
210        return None;
211    }
212
213    let (tx, rx) = mpsc::channel::<Output>(OUTPUT_CHANNEL_CAPACITY);
214    let store = crate::store::create_storage_backend(&config);
215    let app = Arc::new(App::new(config, tx, None, store));
216    let store_ref = app.store.clone();
217    let state = SessionState::new(
218        data_dir_owned.clone(),
219        output,
220        log_filters,
221        store_ref.clone(),
222    );
223    let completer = CommandCompleter::new(data_dir_owned.clone(), store_ref);
224
225    intro_messages.push(format!("afpay v{VERSION} {} mode", mode_name(frontend)));
226    intro_messages.push(banner_hint(frontend).to_string());
227
228    Some(InteractiveSessionRuntime {
229        frontend,
230        state,
231        backend: SessionBackend::Local { app, rx },
232        completer,
233        history_path: format!("{data_dir_owned}/.afpay_history"),
234        intro_messages,
235    })
236}
237
238#[cfg(all(feature = "interactive", feature = "rpc"))]
239async fn bootstrap_remote_session(
240    frontend: InteractiveFrontend,
241    output: OutputFormat,
242    log: &[String],
243    data_dir: Option<&str>,
244    endpoint: &str,
245    rpc_secret: Option<&str>,
246) -> Option<InteractiveSessionRuntime> {
247    let (endpoint, secret) = remote::require_remote_args(Some(endpoint), rpc_secret, output);
248    let log_filters = agent_first_data::cli_parse_log_filters(log);
249    let resolved_dir = data_dir
250        .map(ToString::to_string)
251        .unwrap_or_else(|| RuntimeConfig::default().data_dir);
252    let mut local_config = match RuntimeConfig::load_from_dir(&resolved_dir) {
253        Ok(config) => config,
254        Err(error) => {
255            let _ = writeln!(std::io::stdout(), "config error: {error}");
256            return None;
257        }
258    };
259    local_config.log = log_filters.clone();
260
261    let mut intro_messages = Vec::new();
262    if let Some(startup) = crate::config::maybe_startup_log(
263        &log_filters,
264        false,
265        None,
266        Some(&local_config),
267        serde_json::json!({
268            "mode": mode_name(frontend),
269            "backend": "remote",
270            "rpc_endpoint": endpoint,
271            "data_dir": local_config.data_dir,
272        }),
273    ) {
274        intro_messages.push(render_output(&startup, output));
275    }
276
277    let ping_outputs = remote::rpc_call(endpoint, secret, &Input::Version).await;
278    for value in &ping_outputs {
279        if value.get("code").and_then(|v| v.as_str()) == Some("error") {
280            let error = Output::Error {
281                id: None,
282                error_code: "provider_unreachable".to_string(),
283                error: format!(
284                    "remote version check failed: {}",
285                    value
286                        .get("error")
287                        .and_then(|v| v.as_str())
288                        .unwrap_or("unknown error")
289                ),
290                hint: value
291                    .get("hint")
292                    .and_then(|v| v.as_str())
293                    .map(|value| value.to_string()),
294                retryable: true,
295                trace: Trace::from_duration(0),
296            };
297            let _ = writeln!(std::io::stdout(), "{}", render_output(&error, output));
298            return None;
299        }
300        if value.get("code").and_then(|v| v.as_str()) == Some("version") {
301            let remote_version = value
302                .get("version")
303                .and_then(|v| v.as_str())
304                .unwrap_or("unknown");
305            if remote_version != VERSION {
306                let error = Output::Error {
307                    id: None,
308                    error_code: "version_mismatch".to_string(),
309                    error: format!("version mismatch: local v{VERSION}, remote v{remote_version}"),
310                    hint: Some("upgrade both client and server to the same version".to_string()),
311                    retryable: false,
312                    trace: Trace::from_duration(0),
313                };
314                let _ = writeln!(std::io::stdout(), "{}", render_output(&error, output));
315                return None;
316            }
317        }
318    }
319
320    let store_ref = crate::store::create_storage_backend(&local_config).map(Arc::new);
321    let state = SessionState::new(
322        local_config.data_dir.clone(),
323        output,
324        log_filters,
325        store_ref.clone(),
326    );
327    let completer = CommandCompleter::new(local_config.data_dir.clone(), store_ref);
328
329    intro_messages.push(format!(
330        "afpay v{VERSION} {} mode (remote: {endpoint})",
331        mode_name(frontend)
332    ));
333    intro_messages.push(banner_hint(frontend).to_string());
334
335    Some(InteractiveSessionRuntime {
336        frontend,
337        state,
338        backend: SessionBackend::Remote {
339            endpoint: endpoint.to_string(),
340            secret: secret.to_string(),
341        },
342        completer,
343        history_path: format!("{}/.afpay_history", local_config.data_dir),
344        intro_messages,
345    })
346}