irosh 0.2.0

SSH sessions over Iroh peer-to-peer transport
Documentation
use crate::error::{Result, ServerError, TransportError};
use crate::transport::stream::IrohDuplex;
use crate::transport::transfer::{
    TransferComplete, TransferFailure, TransferFailureCode, TransferFrame, TransferReady,
    read_next_frame, write_put_complete, write_put_ready, write_transfer_error,
};
use tokio::io::AsyncWriteExt;

use crate::server::transfer::ShellContext;
use crate::server::transfer::helpers::{
    PreparedPutDestination, atomic_rename_failure, prepare_put_destination, spawn_upload_helper,
    target_exists_failure,
};

pub(crate) async fn handle_put_request(
    stream: &mut IrohDuplex,
    request: crate::transport::transfer::PutRequest,
    context: ShellContext,
) -> Result<()> {
    if request.recursive {
        return handle_recursive_put_request(stream, request, context).await;
    }

    let prepared = match prepare_put_destination(context, &request.path).await? {
        Some(prepared) => prepared,
        None => {
            let dest_path = context.resolve_path(&request.path).await?;
            write_transfer_error(stream, &target_exists_failure(&dest_path))
                .await
                .map_err(TransportError::from)?;
            return Ok(());
        }
    };
    let PreparedPutDestination {
        final_arg,
        part_arg,
    } = prepared;

    let mut sink = spawn_upload_helper(context, &part_arg).await?;
    let mut transfer_failed = false;
    let mut received = 0u64;
    {
        let mut stdin = sink.stdin().ok_or_else(|| ServerError::TransferFailed {
            details: "upload helper sink unavailable".to_string(),
        })?;

        write_put_ready(
            stream,
            &TransferReady {
                size: request.size,
                mode: request.mode,
            },
        )
        .await
        .map_err(TransportError::from)?;

        loop {
            match read_next_frame(stream)
                .await
                .map_err(TransportError::from)?
            {
                TransferFrame::PutChunk(chunk) => {
                    received += chunk.len() as u64;
                    if let Err(err) = stdin.write_all(&chunk).await {
                        tracing::warn!("Failed to write to upload helper: {}", err);
                        transfer_failed = true;
                        break;
                    }
                }
                TransferFrame::PutComplete(complete) => {
                    if complete.size != received {
                        write_transfer_error(
                            stream,
                            &TransferFailure::new(
                                TransferFailureCode::SizeMismatch,
                                format!("received {}, client reported {}", received, complete.size),
                            ),
                        )
                        .await
                        .map_err(TransportError::from)?;
                        transfer_failed = true;
                    }
                    break;
                }
                TransferFrame::Error(_) => {
                    transfer_failed = true;
                    break;
                }
                other => {
                    let _ = write_transfer_error(
                        stream,
                        &TransferFailure::new(
                            TransferFailureCode::UnexpectedFrame,
                            format!("{other:?}"),
                        ),
                    )
                    .await;
                    transfer_failed = true;
                    break;
                }
            }
        }

        let _ = stdin.flush().await;
    }

    let helper_res = sink.wait().await;

    if transfer_failed || helper_res.is_err() {
        context.remove_file_if_present(&part_arg).await;

        if let Err(err) = helper_res {
            if !transfer_failed {
                write_transfer_error(
                    stream,
                    &TransferFailure::new(TransferFailureCode::HelperFailed, err.to_string()),
                )
                .await
                .map_err(TransportError::from)?;
            }
        }
        return Ok(());
    }

    if !context.rename(&part_arg, &final_arg).await? {
        write_transfer_error(stream, &atomic_rename_failure(&final_arg))
            .await
            .map_err(TransportError::from)?;
        return Ok(());
    }

    if let Some(mode) = request.mode {
        context.chmod(&final_arg, mode).await;
    }

    write_put_complete(stream, &TransferComplete { size: received })
        .await
        .map_err(TransportError::from)?;
    Ok(())
}

async fn handle_recursive_put_request(
    stream: &mut IrohDuplex,
    request: crate::transport::transfer::PutRequest,
    context: ShellContext,
) -> Result<()> {
    let dest_root = context.resolve_path(&request.path).await?;
    context.create_dir_all(&dest_root).await?;

    write_put_ready(
        stream,
        &TransferReady {
            size: 0,
            mode: None,
        },
    )
    .await
    .map_err(TransportError::from)?;

    let mut total_received = 0u64;
    loop {
        match read_next_frame(stream)
            .await
            .map_err(TransportError::from)?
        {
            TransferFrame::NewEntry(header) => {
                let full_path = dest_root.join(&header.path);
                let full_path_str = full_path.display().to_string();

                if header.is_dir {
                    context.create_dir_all(&full_path).await?;
                    if let Some(mode) = header.mode {
                        context.chmod(&full_path_str, mode).await;
                    }
                } else {
                    // Use atomic rename pattern for each file in the recursive stream
                    let prepared = match prepare_put_destination(context, &full_path_str).await? {
                        Some(p) => p,
                        None => {
                            write_transfer_error(stream, &target_exists_failure(&full_path))
                                .await
                                .map_err(TransportError::from)?;
                            return Ok(()); // Fail whole recursive transfer on collision
                        }
                    };

                    let mut sink = spawn_upload_helper(context, &prepared.part_arg).await?;
                    let mut entry_failed = false;
                    let mut file_received = 0u64;
                    {
                        let mut stdin =
                            sink.stdin().ok_or_else(|| ServerError::TransferFailed {
                                details: "upload helper sink unavailable".to_string(),
                            })?;
                        loop {
                            match read_next_frame(stream)
                                .await
                                .map_err(TransportError::from)?
                            {
                                TransferFrame::PutChunk(chunk) => {
                                    file_received += chunk.len() as u64;
                                    if let Err(e) = stdin.write_all(&chunk).await {
                                        tracing::warn!("Failed to write to upload helper: {}", e);
                                        entry_failed = true;
                                        break;
                                    }
                                }
                                TransferFrame::EntryComplete(_) => break,
                                other => {
                                    return Err(ServerError::TransferFailed {
                                        details: format!(
                                            "unexpected frame during recursive entry stream: {other:?}"
                                        ),
                                    }
                                    .into());
                                }
                            }
                        }
                        let _ = stdin.flush().await;
                    }
                    let helper_res = sink.wait().await;

                    if entry_failed || helper_res.is_err() {
                        context.remove_file_if_present(&prepared.part_arg).await;
                        return Err(ServerError::TransferFailed {
                            details: format!(
                                "recursive entry upload failed: {}",
                                helper_res.err().map(|e| e.to_string()).unwrap_or_default()
                            ),
                        }
                        .into());
                    }

                    // Perform atomic rename
                    if !context
                        .rename(&prepared.part_arg, &prepared.final_arg)
                        .await?
                    {
                        return Err(ServerError::TransferFailed {
                            details: format!("atomic rename failed for {}", prepared.final_arg),
                        }
                        .into());
                    }

                    if let Some(mode) = header.mode {
                        context.chmod(&prepared.final_arg, mode).await;
                    }
                    total_received += file_received;
                }
            }
            TransferFrame::PutComplete(complete) => {
                write_put_complete(
                    stream,
                    &TransferComplete {
                        size: total_received,
                    },
                )
                .await
                .map_err(TransportError::from)?;
                let _ = complete;
                return Ok(());
            }
            TransferFrame::Error(e) => {
                return Err(ServerError::TransferFailed {
                    details: e.to_string(),
                }
                .into());
            }
            other => {
                return Err(ServerError::TransferFailed {
                    details: format!("unexpected frame during recursive upload: {other:?}"),
                }
                .into());
            }
        }
    }
}