alpaca-trader-rs 0.6.0

Alpaca Markets trading toolkit — async REST client library and interactive TUI trading terminal
Documentation
use std::io;
use std::sync::Arc;
use std::time::Duration;

use clap::Parser;
use crossterm::{
    event::EnableMouseCapture,
    execute,
    terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{backend::CrosstermBackend, Terminal};
use tokio::sync::{mpsc, watch, Notify};
use tokio_util::sync::CancellationToken;

use alpaca_trader_rs::{
    client::AlpacaClient,
    config::{AlpacaConfig, AlpacaEnv},
    events::Event,
    prefs::AppPrefs,
};

/// Alpaca Markets TUI trading terminal.
///
/// Connects to the **live** account by default. Pass `--paper` to use the
/// paper-trading environment (simulated funds, no real money at risk).
#[derive(Parser)]
#[command(version, about)]
struct Args {
    /// Connect to the paper-trading environment (simulated funds).
    /// Omit to use the live account (real money — default).
    #[arg(long)]
    paper: bool,

    /// Simulate order submissions without sending them to Alpaca.
    ///
    /// All read-only operations (account info, positions, watchlist …) still
    /// use the configured environment. Only order submissions are intercepted
    /// and displayed as `[DRY-RUN]` in the UI without being transmitted.
    #[arg(long)]
    dry_run: bool,

    /// Delete stored keychain credentials for the given environment and exit.
    ///
    /// Example: `alpaca-trader --reset paper` or `alpaca-trader --reset live`
    #[arg(long, value_name = "ENV", value_parser = ["paper", "live"])]
    reset: Option<String>,
}

// Bridge library modules into the binary crate so sub-modules can use `crate::config` etc.
mod client {
    pub use alpaca_trader_rs::client::*;
}
mod commands {
    pub use alpaca_trader_rs::commands::*;
}
mod config {
    pub use alpaca_trader_rs::config::*;
}
mod events {
    pub use alpaca_trader_rs::events::*;
}
mod prefs {
    pub use alpaca_trader_rs::prefs::*;
}

mod types {
    pub use alpaca_trader_rs::types::*;
}

mod app;
mod clipboard;
mod credentials;
mod handlers;
mod input;
mod ui;
mod update;

use app::App;
use update::update;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let args = Args::parse();

    dotenvy::dotenv().ok();

    // ── --reset: delete keychain credentials and exit ──────────────────────────
    if let Some(ref env_str) = args.reset {
        let env = if env_str == "paper" {
            AlpacaEnv::Paper
        } else {
            AlpacaEnv::Live
        };
        credentials::reset(env);
        return Ok(());
    }

    // ── Logging — must come before enable_raw_mode() ───────────────────────────
    let _log_guard = alpaca_trader_rs::logging::init().unwrap_or_else(|e| {
        eprintln!("Warning: failed to initialise logging: {e}");
        // Return a no-op guard by creating a throwaway channel
        let (_, nb) = tracing_appender::non_blocking(std::io::sink());
        nb
    });

    // ── Load user preferences ──────────────────────────────────────────────────
    let prefs = AppPrefs::load();

    let env = if args.paper || prefs.app.default_env == "paper" {
        AlpacaEnv::Paper
    } else {
        AlpacaEnv::Live
    };

    // Resolve credentials before entering raw-mode (may print/prompt to terminal).
    let creds = credentials::resolve(env).map_err(|e| {
        tracing::error!(error = %e, "credential resolution failed");
        eprintln!("Error: {e}");
        e
    })?;

    let config = AlpacaConfig::from_credentials(creds)
        .map_err(|e| {
            tracing::error!(error = %e, "configuration error");
            eprintln!("Configuration error: {e}");
            e
        })?
        .with_dry_run(args.dry_run);

    tracing::info!(env = config.env_label(), "alpaca-trader starting");

    let client = Arc::new(AlpacaClient::new(config.clone()));
    let refresh_notify = Arc::new(Notify::new());

    // Command channel — sync update() → async command handler
    let (command_tx, command_rx) = mpsc::channel::<alpaca_trader_rs::commands::Command>(8);

    // Symbol watch channel — pushes watchlist symbols to the market stream
    let (symbol_tx, symbol_rx) = watch::channel::<Vec<String>>(vec![]);

    // ── Terminal setup ─────────────────────────────────────────────────────────
    enable_raw_mode()?;
    let mut stdout = io::stdout();
    execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
    let backend = CrosstermBackend::new(stdout);
    let mut terminal = Terminal::new(backend)?;

    let mut app = App::new(
        config.clone(),
        prefs.clone(),
        refresh_notify.clone(),
        command_tx,
        symbol_tx,
    );
    // Show "Loading…" while initial data is being fetched.
    app.push_status(app::StatusMessage::persistent("Loading…"));

    // Event channel
    let (tx, mut rx) = mpsc::channel::<Event>(256);

    // Cancellation token shared by all background tasks
    let cancel = CancellationToken::new();

    // ── Background tasks ───────────────────────────────────────────────────────

    // Input task
    tokio::spawn(handlers::input::run(tx.clone(), cancel.clone()));

    // REST polling task
    tokio::spawn(handlers::rest::run(
        tx.clone(),
        cancel.clone(),
        client.clone(),
        refresh_notify.clone(),
        prefs.clone(),
    ));

    // Command execution task (order submit, cancel, watchlist mutations)
    tokio::spawn(handlers::commands::run(
        command_rx,
        tx.clone(),
        client.clone(),
        refresh_notify.clone(),
        cancel.clone(),
    ));

    // Market data WebSocket stream
    tokio::spawn(alpaca_trader_rs::stream::market::run(
        tx.clone(),
        cancel.clone(),
        config.clone(),
        symbol_rx,
        prefs.clone(),
    ));

    // Account/trade updates WebSocket stream
    tokio::spawn(alpaca_trader_rs::stream::account::run(
        tx.clone(),
        cancel.clone(),
        config.clone(),
        prefs.clone(),
    ));

    // Tick task — drives clock refresh every 250 ms
    {
        let tx = tx.clone();
        let cancel = cancel.clone();
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_millis(250));
            loop {
                tokio::select! {
                    _ = interval.tick() => {
                        if tx.send(Event::Tick).await.is_err() { break; }
                    }
                    _ = cancel.cancelled() => break,
                }
            }
        });
    }

    // Initial data load before first render
    tokio::spawn(handlers::rest::poll_once(tx.clone(), client.clone()));

    tracing::info!("all tasks spawned, entering main loop");

    // ── Main loop ──────────────────────────────────────────────────────────────
    loop {
        terminal.draw(|f| ui::render(f, &mut app))?;

        match rx.recv().await {
            Some(Event::Quit) | None => break,
            Some(event) => update(&mut app, event),
        }

        // If the update requested an immediate redraw (e.g. terminal resize),
        // draw again before blocking for the next event so the layout adapts
        // right away instead of waiting for the next periodic tick.
        if app.needs_redraw {
            app.needs_redraw = false;
            terminal.draw(|f| ui::render(f, &mut app))?;
        }

        if app.should_quit {
            break;
        }
    }

    // ── Cleanup ────────────────────────────────────────────────────────────────
    tracing::info!("shutting down");
    cancel.cancel();
    disable_raw_mode()?;
    execute!(
        terminal.backend_mut(),
        LeaveAlternateScreen,
        crossterm::event::DisableMouseCapture
    )?;
    terminal.show_cursor()?;

    Ok(())
}