mod app;
mod cache;
mod ui;
use std::{
io::stdout,
panic,
path::{Path, PathBuf},
process::Stdio,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::{Context, Result, anyhow};
use crossterm::{
cursor::{Hide, Show},
event::{Event, EventStream, KeyEventKind},
execute,
terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
};
use futures_util::StreamExt;
use ratatui::{Terminal, backend::CrosstermBackend};
use tokio::{
sync::mpsc,
time::{Duration, sleep},
};
use crate::commands::volume::sftp::{
LocalOverwritePolicy, VolumeFileEntry, VolumeSftp, VolumeSftpError, VolumeTransferProgress,
VolumeTransferProgressCallback,
};
use crate::util::{editor::resolve_editor_command, progress::create_spinner};
use app::{
BrowserAction, BrowserMode, BusyState, ConfirmAction, VolumeBrowserApp, normalize_remote_dir,
parent_remote_dir,
};
const PREFETCH_DEPTH: usize = 1;
const MAX_PREFETCH_PER_DIR: usize = 16;
const INITIAL_LOAD_SPINNER_DELAY: Duration = Duration::from_secs(3);
pub struct VolumeBrowserParams {
pub service_instance_id: String,
pub target_name: String,
pub mount_path: String,
pub remote_path: String,
pub transfer_concurrency: usize,
pub editor: Option<String>,
}
struct RefreshResult {
remote_dir: String,
result: Result<Vec<VolumeFileEntry>>,
kind: FetchKind,
select_remote_path: Option<String>,
}
enum EditOutcome {
Changed,
Unchanged,
}
struct FetchRequest {
remote_dir: String,
kind: FetchKind,
select_remote_path: Option<String>,
}
#[derive(Clone)]
struct FetchDispatcher {
active_tx: mpsc::UnboundedSender<FetchRequest>,
background_tx: mpsc::UnboundedSender<FetchRequest>,
}
#[derive(Debug, Clone, Copy)]
enum FetchKind {
Active(u64),
Revalidate,
Prefetch,
}
pub async fn run(params: VolumeBrowserParams) -> Result<()> {
let original_hook = panic::take_hook();
panic::set_hook(Box::new(move |info| {
restore_terminal();
original_hook(info);
}));
let mut app = VolumeBrowserApp::new(
params.target_name.clone(),
params.mount_path.clone(),
params.remote_path.clone(),
)?;
let initial_remote_dir = app.remote_dir.clone();
let mut browser_sftp = VolumeSftp::new(
params.service_instance_id.clone(),
params.mount_path.clone(),
);
let initial_entries = fetch_initial_entries(&mut browser_sftp, &initial_remote_dir).await?;
app.cache
.insert(&initial_remote_dir, initial_entries.clone());
app.apply_remote_entries(initial_entries);
let mut terminal = setup_terminal()?;
let _cleanup = scopeguard::guard((), |_| {
restore_terminal();
});
let mut events = EventStream::new();
let (refresh_tx, mut refresh_rx) = mpsc::unbounded_channel::<RefreshResult>();
let fetch_dispatcher = spawn_fetch_worker(browser_sftp, refresh_tx.clone());
let mut refresh_request_id = 0u64;
let mut active_refresh_request_id = 0u64;
spawn_prefetch(
&fetch_dispatcher,
&app.cache,
&app.remote_dir,
&app.remote_entries,
PREFETCH_DEPTH,
);
let mut render_interval = tokio::time::interval(std::time::Duration::from_millis(16));
render_interval.tick().await;
let mut dirty = true;
'main: loop {
tokio::select! {
biased;
_ = render_interval.tick(), if dirty || app.is_busy() => {
terminal.draw(|frame| ui::render(&app, frame))?;
dirty = false;
}
Some(refresh) = refresh_rx.recv() => {
handle_refresh_result(
&mut app,
refresh,
active_refresh_request_id,
&fetch_dispatcher,
);
dirty = true;
}
Some(Ok(event)) = events.next() => {
match event {
Event::Key(key) => {
if !matches!(key.kind, KeyEventKind::Press | KeyEventKind::Repeat) {
continue;
}
let was_confirming = app.mode == BrowserMode::Confirm;
let action = app.handle_key(key);
if was_confirming && !matches!(action, BrowserAction::Continue) {
terminal.draw(|frame| ui::render(&app, frame))?;
}
match action {
BrowserAction::Continue => {}
BrowserAction::Quit => break 'main,
BrowserAction::Refresh => {
spawn_refresh(
&fetch_dispatcher,
&mut refresh_request_id,
&mut active_refresh_request_id,
app.remote_dir.clone(),
app.selected_remote_path(),
);
app.mark_refreshing();
}
BrowserAction::OpenRemoteDirectory(remote_dir) => {
open_directory(
&mut app,
&fetch_dispatcher,
&mut refresh_request_id,
&mut active_refresh_request_id,
normalize_remote_dir(&remote_dir),
);
}
BrowserAction::Download { local_path, remote_path, is_dir, overwrite_policy, progress_base } => {
let message = if matches!(overwrite_policy, LocalOverwritePolicy::All) {
"Overwriting all..."
} else if matches!(overwrite_policy, LocalOverwritePolicy::Path(_)) {
"Overwriting..."
} else if is_dir {
"Preparing download..."
} else {
"Downloading..."
};
app.mark_busy(BusyState::Downloading, message);
app.transfer_progress = progress_base.clone();
terminal.draw(|frame| ui::render(&app, frame))?;
handle_download(&mut app, &mut terminal, ¶ms, local_path, remote_path, is_dir, overwrite_policy, progress_base).await?;
}
BrowserAction::Upload { local_path, remote_path, overwrite } => {
let message = if overwrite {
"Overwriting..."
} else {
"Uploading..."
};
app.mark_busy(BusyState::Uploading, message);
terminal.draw(|frame| ui::render(&app, frame))?;
let uploaded = handle_upload(&mut app, ¶ms, local_path.clone(), remote_path.clone(), overwrite).await;
if uploaded {
apply_optimistic_upload(&mut app, &local_path, &remote_path);
spawn_revalidate(
&fetch_dispatcher,
parent_remote_dir(&remote_path),
);
}
}
BrowserAction::Delete { remote_path } => {
app.mark_busy(BusyState::Deleting, "Deleting...");
terminal.draw(|frame| ui::render(&app, frame))?;
match handle_delete(¶ms, &remote_path).await {
Ok(()) => {
apply_optimistic_delete(&mut app, &remote_path);
app.set_status(format!("Deleted {remote_path}"));
spawn_revalidate(
&fetch_dispatcher,
app.remote_dir.clone(),
);
}
Err(err) => app.set_error(err.to_string()),
}
}
BrowserAction::Edit { remote_path } => {
app.mark_busy(BusyState::Editing, "Opening editor...");
terminal.draw(|frame| ui::render(&app, frame))?;
restore_terminal();
let edit_result = edit_remote_file(¶ms, &remote_path).await;
terminal = setup_terminal()?;
match edit_result {
Ok(EditOutcome::Changed) => {
app.set_status(format!("Edited and uploaded {remote_path}"));
spawn_revalidate(
&fetch_dispatcher,
parent_remote_dir(&remote_path),
);
}
Ok(EditOutcome::Unchanged) => {
app.set_status(format!("No changes made to {remote_path}"));
}
Err(err) => app.set_error(err.to_string()),
}
}
}
dirty = true;
}
Event::Resize(_, _) => {
terminal.clear()?;
dirty = true;
}
_ => {}
}
}
}
}
Ok(())
}
fn spawn_refresh(
dispatcher: &FetchDispatcher,
refresh_request_id: &mut u64,
active_refresh_request_id: &mut u64,
remote_dir: String,
select_remote_path: Option<String>,
) {
*refresh_request_id += 1;
*active_refresh_request_id = *refresh_request_id;
let request_id = *refresh_request_id;
spawn_fetch(
dispatcher,
remote_dir,
FetchKind::Active(request_id),
select_remote_path,
);
}
fn spawn_revalidate(dispatcher: &FetchDispatcher, remote_dir: String) {
spawn_fetch(dispatcher, remote_dir, FetchKind::Revalidate, None);
}
fn spawn_prefetch(
dispatcher: &FetchDispatcher,
cache: &cache::DirCache,
remote_dir: &str,
entries: &[VolumeFileEntry],
depth: usize,
) {
if depth == 0 {
return;
}
let missing = cache.missing_children(remote_dir, entries, MAX_PREFETCH_PER_DIR);
for child_dir in missing {
spawn_fetch(dispatcher, child_dir, FetchKind::Prefetch, None);
}
}
fn spawn_fetch(
dispatcher: &FetchDispatcher,
remote_dir: String,
kind: FetchKind,
select_remote_path: Option<String>,
) {
let request = FetchRequest {
remote_dir,
kind,
select_remote_path,
};
let tx = match request.kind {
FetchKind::Active(_) => &dispatcher.active_tx,
FetchKind::Revalidate | FetchKind::Prefetch => &dispatcher.background_tx,
};
let _ = tx.send(request);
}
async fn fetch_initial_entries(
sftp: &mut VolumeSftp,
initial_remote_dir: &str,
) -> Result<Vec<VolumeFileEntry>> {
let fetch = fetch_entries(sftp, initial_remote_dir);
tokio::pin!(fetch);
let mut spinner = None;
let result = tokio::select! {
result = &mut fetch => result,
_ = sleep(INITIAL_LOAD_SPINNER_DELAY) => {
spinner = Some(create_spinner(format!("Loading {initial_remote_dir}...")));
fetch.await
}
}
.with_context(|| format!("Failed to load remote directory {initial_remote_dir}"));
if let Some(spinner) = spinner {
spinner.finish_and_clear();
}
result
}
fn spawn_fetch_worker(
mut sftp: VolumeSftp,
tx: mpsc::UnboundedSender<RefreshResult>,
) -> FetchDispatcher {
let (active_tx, mut active_rx) = mpsc::unbounded_channel::<FetchRequest>();
let (background_tx, mut background_rx) = mpsc::unbounded_channel::<FetchRequest>();
tokio::spawn(async move {
loop {
let Some(request) = (tokio::select! {
biased;
request = active_rx.recv() => request,
request = background_rx.recv() => request,
else => None,
}) else {
break;
};
let result = fetch_entries(&mut sftp, &request.remote_dir).await;
if tx
.send(RefreshResult {
remote_dir: request.remote_dir,
result,
kind: request.kind,
select_remote_path: request.select_remote_path,
})
.is_err()
{
break;
}
}
});
FetchDispatcher {
active_tx,
background_tx,
}
}
fn open_directory(
app: &mut VolumeBrowserApp,
dispatcher: &FetchDispatcher,
refresh_request_id: &mut u64,
active_refresh_request_id: &mut u64,
remote_dir: String,
) {
let current_remote_dir = app.remote_dir.clone();
let select_remote_path = if parent_remote_dir(¤t_remote_dir) == remote_dir {
Some(current_remote_dir.clone())
} else {
None
};
let cached = {
let (entries, kind) = app.cache.get(&remote_dir);
entries.map(|entries| (entries.to_vec(), kind))
};
let Some((entries, kind)) = cached else {
if select_remote_path.is_none() {
app.remote_selected = 0;
}
spawn_refresh(
dispatcher,
refresh_request_id,
active_refresh_request_id,
remote_dir,
select_remote_path,
);
app.mark_loading();
return;
};
app.remote_dir = remote_dir.clone();
if select_remote_path.is_none() {
app.remote_selected = 0;
}
app.apply_cached_entries_with_selection(entries.clone(), select_remote_path.as_deref());
if matches!(kind, cache::Lookup::Stale) {
app.mark_revalidating();
spawn_revalidate(dispatcher, remote_dir.clone());
}
spawn_prefetch(
dispatcher,
&app.cache,
&remote_dir,
&entries,
PREFETCH_DEPTH,
);
}
fn handle_refresh_result(
app: &mut VolumeBrowserApp,
refresh: RefreshResult,
active_refresh_request_id: u64,
dispatcher: &FetchDispatcher,
) {
if let Ok(entries) = &refresh.result {
app.cache.insert(&refresh.remote_dir, entries.clone());
}
match refresh.kind {
FetchKind::Active(request_id) => {
if request_id != active_refresh_request_id {
return;
}
app.remote_dir = refresh.remote_dir.clone();
match refresh.result {
Ok(entries) => {
app.apply_remote_entries_with_selection(
entries.clone(),
refresh.select_remote_path.as_deref(),
);
spawn_prefetch(
dispatcher,
&app.cache,
&refresh.remote_dir,
&entries,
PREFETCH_DEPTH,
);
}
Err(err) => app.set_error(err.to_string()),
}
}
FetchKind::Revalidate => {
if refresh.remote_dir != app.remote_dir {
return;
}
match refresh.result {
Ok(entries) => {
app.apply_cached_entries(entries.clone());
spawn_prefetch(
dispatcher,
&app.cache,
&refresh.remote_dir,
&entries,
PREFETCH_DEPTH,
);
}
Err(_) => {}
}
app.clear_revalidating();
}
FetchKind::Prefetch => {
}
}
}
fn apply_optimistic_delete(app: &mut VolumeBrowserApp, remote_path: &str) {
let parent = parent_remote_dir(remote_path);
let name = basename(remote_path);
app.cache.apply_delete(&parent, &name);
app.cache.invalidate_subtree(remote_path);
if app.remote_dir == parent {
app.remote_entries.retain(|entry| entry.name != name);
app.remote_selected = app
.remote_selected
.min(app.remote_entries.len().saturating_sub(1));
}
}
fn apply_optimistic_upload(app: &mut VolumeBrowserApp, local_path: &Path, remote_path: &str) {
let parent = parent_remote_dir(remote_path);
let name = basename(remote_path);
if name.is_empty() {
return;
}
let metadata = std::fs::metadata(local_path).ok();
let is_dir = metadata.as_ref().is_some_and(|m| m.is_dir());
let size = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
let entry = VolumeFileEntry {
name: name.clone(),
path: remote_path.to_string(),
kind: if is_dir { "directory" } else { "file" },
size,
};
app.cache.apply_upsert(&parent, entry.clone());
if app.remote_dir == parent {
if let Some(existing) = app
.remote_entries
.iter_mut()
.find(|existing| existing.name == name)
{
*existing = entry;
} else {
app.remote_entries.push(entry);
app.remote_entries
.sort_by(|a, b| a.name.to_lowercase().cmp(&b.name.to_lowercase()));
}
}
}
fn basename(remote_path: &str) -> String {
remote_path
.rsplit('/')
.find(|segment| !segment.is_empty())
.unwrap_or("")
.to_string()
}
async fn fetch_entries(sftp: &mut VolumeSftp, remote_dir: &str) -> Result<Vec<VolumeFileEntry>> {
sftp.list_files(remote_dir)
.await
.map(|tree| tree.entries().to_vec())
}
async fn handle_download(
app: &mut VolumeBrowserApp,
terminal: &mut Terminal<CrosstermBackend<std::io::Stdout>>,
params: &VolumeBrowserParams,
local_path: PathBuf,
remote_path: String,
is_dir: bool,
overwrite_policy: LocalOverwritePolicy,
progress_base: Option<app::TransferProgressState>,
) -> Result<()> {
let mut sftp = VolumeSftp::new(
params.service_instance_id.clone(),
params.mount_path.clone(),
);
sftp.set_transfer_concurrency(params.transfer_concurrency);
let download_result = if is_dir {
if let LocalOverwritePolicy::Path(overwrite_path) = &overwrite_policy {
let completed = progress_base
.as_ref()
.map_or(0, |progress| progress.completed);
let total = progress_base.as_ref().map_or(1, |progress| progress.total);
app.set_transfer_progress(VolumeTransferProgress {
current_path: overwrite_path.display().to_string(),
completed,
total,
});
terminal.draw(|frame| ui::render(app, frame))?;
}
let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
let progress: VolumeTransferProgressCallback = Arc::new(move |progress| {
let _ = progress_tx.send(progress);
});
let download = sftp.download_dir_with_progress_and_overwrite_policy(
&remote_path,
&local_path,
overwrite_policy,
Some(progress),
);
tokio::pin!(download);
loop {
tokio::select! {
result = &mut download => {
while let Ok(progress) = progress_rx.try_recv() {
app.set_transfer_progress(adjust_progress(progress, progress_base.as_ref()));
terminal.draw(|frame| ui::render(app, frame))?;
}
break result;
}
Some(progress) = progress_rx.recv() => {
app.set_transfer_progress(adjust_progress(progress, progress_base.as_ref()));
terminal.draw(|frame| ui::render(app, frame))?;
}
}
}
} else {
app.set_transfer_progress(VolumeTransferProgress {
current_path: remote_path.clone(),
completed: 0,
total: 1,
});
terminal.draw(|frame| ui::render(app, frame))?;
let overwrite = !matches!(overwrite_policy, LocalOverwritePolicy::None);
sftp.download_file(&remote_path, &local_path, overwrite)
.await
};
match download_result {
Ok(downloaded_path) => app.set_status(format!(
"Downloaded {remote_path} to {}",
downloaded_path.display()
)),
Err(err) => {
if let Some(VolumeSftpError::LocalPathExists(overwrite_path)) =
err.downcast_ref::<VolumeSftpError>()
{
app.request_overwrite(
ConfirmAction::Download,
local_path,
Some(overwrite_path.clone()),
remote_path,
is_dir,
"A local path already exists at the download destination.".to_string(),
);
} else {
app.set_error(err.to_string());
}
}
}
Ok(())
}
fn adjust_progress(
mut progress: VolumeTransferProgress,
base: Option<&app::TransferProgressState>,
) -> VolumeTransferProgress {
let Some(base) = base else {
return progress;
};
let total = base.total.max(progress.total);
progress.completed = base.completed.saturating_add(progress.completed).min(total);
progress.total = total;
progress
}
async fn handle_upload(
app: &mut VolumeBrowserApp,
params: &VolumeBrowserParams,
local_path: PathBuf,
remote_path: String,
overwrite: bool,
) -> bool {
let mut sftp = VolumeSftp::new(
params.service_instance_id.clone(),
params.mount_path.clone(),
);
match sftp.upload(&local_path, &remote_path, overwrite).await {
Ok(uploaded_path) => {
app.set_status(format!(
"Uploaded {} to {uploaded_path}",
local_path.display()
));
true
}
Err(err) => {
if err
.downcast_ref::<VolumeSftpError>()
.is_some_and(|err| matches!(err, VolumeSftpError::RemotePathExists(_)))
{
app.request_overwrite(
ConfirmAction::Upload,
local_path,
None,
remote_path,
false,
"A remote file already exists at the upload destination.".to_string(),
);
} else {
app.set_error(err.to_string());
}
false
}
}
}
async fn handle_delete(params: &VolumeBrowserParams, remote_path: &str) -> Result<()> {
let mut sftp = VolumeSftp::new(
params.service_instance_id.clone(),
params.mount_path.clone(),
);
sftp.delete(remote_path).await
}
async fn edit_remote_file(params: &VolumeBrowserParams, remote_path: &str) -> Result<EditOutcome> {
let temp_path = temp_edit_path(remote_path)?;
let mut sftp = VolumeSftp::new(
params.service_instance_id.clone(),
params.mount_path.clone(),
);
sftp.download(remote_path, &temp_path, true).await?;
let modified_before = modified_at(&temp_path)?;
run_editor(&temp_path, params.editor.as_deref()).await?;
let modified_after = modified_at(&temp_path)?;
if modified_before == modified_after {
let _ = tokio::fs::remove_file(&temp_path).await;
return Ok(EditOutcome::Unchanged);
}
sftp.upload(&temp_path, remote_path, true).await?;
let _ = tokio::fs::remove_file(&temp_path).await;
Ok(EditOutcome::Changed)
}
fn temp_edit_path(remote_path: &str) -> Result<PathBuf> {
let filename = remote_path
.rsplit('/')
.find(|segment| !segment.is_empty())
.unwrap_or("volume-file");
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("System clock is before UNIX epoch")?
.as_millis();
Ok(std::env::temp_dir().join(format!(
"railway-volume-edit-{}-{millis}-{filename}",
std::process::id()
)))
}
fn modified_at(path: &Path) -> Result<SystemTime> {
std::fs::metadata(path)
.and_then(|metadata| metadata.modified())
.with_context(|| format!("Failed to read modified time for {}", path.display()))
}
async fn run_editor(path: &Path, editor_override: Option<&str>) -> Result<()> {
let editor = resolve_editor_command(editor_override)?;
let command = format!("{} {}", editor, shell_quote(&path.display().to_string()));
let (shell, args): (String, Vec<String>) = if cfg!(target_os = "windows") {
(
std::env::var("COMSPEC").unwrap_or_else(|_| "cmd".to_string()),
vec!["/C".to_string(), command],
)
} else {
(
std::env::var("SHELL").unwrap_or_else(|_| "sh".to_string()),
vec!["-lc".to_string(), command],
)
};
let status = tokio::process::Command::new(shell)
.args(args)
.stdin(Stdio::inherit())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.status()
.await
.with_context(|| format!("Failed to open editor command: {editor}"))?;
if !status.success() {
return Err(anyhow!("Editor exited with status {status}"));
}
Ok(())
}
fn shell_quote(value: &str) -> String {
if cfg!(target_os = "windows") {
format!("\"{}\"", value.replace('"', "\\\""))
} else {
format!("'{}'", value.replace('\'', "'\\''"))
}
}
fn setup_terminal() -> Result<Terminal<CrosstermBackend<std::io::Stdout>>> {
enable_raw_mode()?;
execute!(stdout(), EnterAlternateScreen, Hide)?;
let backend = CrosstermBackend::new(stdout());
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
Ok(terminal)
}
fn restore_terminal() {
let _ = execute!(stdout(), LeaveAlternateScreen, Show);
let _ = disable_raw_mode();
}