irosh 0.1.0

SSH sessions over Iroh peer-to-peer transport
Documentation
use crate::error::{Result, ServerError, TransportError};
use crate::transport::stream::IrohDuplex;
use crate::transport::transfer::{
    TransferComplete, TransferFailure, TransferFailureCode, TransferFrame, TransferReady,
    read_next_frame, write_put_complete, write_put_ready, write_transfer_error,
};
use tokio::io::AsyncWriteExt;

use crate::server::transfer::helpers::{
    PreparedPutDestination, atomic_rename_failure, prepare_put_destination, spawn_upload_helper,
    target_exists_failure,
};
use crate::server::transfer::{ConnectionShellState, LiveShellContext, resolve_remote_path};

pub(crate) async fn handle_put_request(
    stream: &mut IrohDuplex,
    request: crate::transport::transfer::PutRequest,
    shell_state: ConnectionShellState,
) -> Result<()> {
    let Some(shell) = LiveShellContext::from_state(&shell_state) else {
        write_transfer_error(
            stream,
            &TransferFailure::new(
                TransferFailureCode::RemoteShellUnavailable,
                "no live shell process",
            ),
        )
        .await
        .map_err(TransportError::from)?;
        return Ok(());
    };

    let prepared = match prepare_put_destination(shell, &request.path).await? {
        Some(prepared) => prepared,
        None => {
            let dest_path = resolve_remote_path(&request.path)?;
            write_transfer_error(stream, &target_exists_failure(&dest_path))
                .await
                .map_err(TransportError::from)?;
            return Ok(());
        }
    };
    let PreparedPutDestination {
        final_arg,
        part_arg,
    } = prepared;

    let mut child = spawn_upload_helper(shell, &part_arg).await?;
    let mut stdin = child
        .stdin
        .take()
        .ok_or_else(|| ServerError::TransferFailed {
            details: "upload helper stdin unavailable".to_string(),
        })?;

    write_put_ready(
        stream,
        &TransferReady {
            size: request.size,
            mode: request.mode,
        },
    )
    .await
    .map_err(TransportError::from)?;

    let mut received = 0u64;
    let mut transfer_failed = false;
    loop {
        match read_next_frame(stream)
            .await
            .map_err(TransportError::from)?
        {
            TransferFrame::PutChunk(chunk) => {
                received += chunk.len() as u64;
                if let Err(err) = stdin.write_all(&chunk).await {
                    tracing::warn!("Failed to write to upload helper: {}", err);
                    transfer_failed = true;
                    break;
                }
            }
            TransferFrame::PutComplete(complete) => {
                if complete.size != received {
                    write_transfer_error(
                        stream,
                        &TransferFailure::new(
                            TransferFailureCode::SizeMismatch,
                            format!("received {}, client reported {}", received, complete.size),
                        ),
                    )
                    .await
                    .map_err(TransportError::from)?;
                    transfer_failed = true;
                }
                break;
            }
            TransferFrame::Error(_) => {
                transfer_failed = true;
                break;
            }
            other => {
                let _ = write_transfer_error(
                    stream,
                    &TransferFailure::new(
                        TransferFailureCode::UnexpectedFrame,
                        format!("{other:?}"),
                    ),
                )
                .await;
                transfer_failed = true;
                break;
            }
        }
    }

    let _ = stdin.flush().await;
    drop(stdin);

    let output = child
        .wait_with_output()
        .await
        .map_err(|e| ServerError::TransferFailed {
            details: format!("waiting for upload helper failed: {e}"),
        })?;

    if transfer_failed || !output.status.success() {
        shell.remove_file_if_present(&part_arg).await;

        if !output.status.success() && !transfer_failed {
            write_transfer_error(
                stream,
                &TransferFailure::new(
                    TransferFailureCode::HelperFailed,
                    String::from_utf8_lossy(&output.stderr).trim().to_string(),
                ),
            )
            .await
            .map_err(TransportError::from)?;
        }
        return Ok(());
    }

    if !shell.rename(&part_arg, &final_arg).await? {
        write_transfer_error(stream, &atomic_rename_failure(&final_arg))
            .await
            .map_err(TransportError::from)?;
        return Ok(());
    }

    if let Some(mode) = request.mode {
        shell.chmod(&final_arg, mode).await;
    }

    write_put_complete(stream, &TransferComplete { size: received })
        .await
        .map_err(TransportError::from)?;
    Ok(())
}