irosh 0.1.0

SSH sessions over Iroh peer-to-peer transport
Documentation
use tokio::io::AsyncReadExt;

use crate::client::{Session, TransferProgress};
use crate::error::{ClientError, Result, TransportError};
use crate::transport::transfer::{
    MAX_CHUNK_BYTES, TransferFrame, TransferReady, read_next_frame, write_put_chunk,
    write_put_complete, write_put_request,
};

impl Session {
    /// Uploads one local file to the remote peer on a separate authenticated stream.
    pub async fn put_file(
        &mut self,
        local: impl AsRef<std::path::Path>,
        remote: impl AsRef<std::path::Path>,
    ) -> Result<()> {
        self.put_file_with_progress(local, remote, |_| {}).await
    }

    /// Uploads one local file and reports progress synchronously through the callback.
    pub async fn put_file_with_progress<F>(
        &mut self,
        local: impl AsRef<std::path::Path>,
        remote: impl AsRef<std::path::Path>,
        mut on_progress: F,
    ) -> Result<()>
    where
        F: FnMut(TransferProgress),
    {
        let local = local.as_ref();
        let remote = remote.as_ref();
        if remote.as_os_str().is_empty() {
            return Err(ClientError::TransferTargetInvalid {
                reason: "remote path is empty",
            }
            .into());
        }

        let mut stream = self.open_transfer_stream("upload unavailable").await?;

        let mut file =
            tokio::fs::File::open(local)
                .await
                .map_err(|source| ClientError::FileIo {
                    operation: "open local source file",
                    path: local.to_path_buf(),
                    source,
                })?;
        let metadata = file
            .metadata()
            .await
            .map_err(|source| ClientError::FileIo {
                operation: "read local source metadata",
                path: local.to_path_buf(),
                source,
            })?;
        let size = metadata.len();
        #[cfg(unix)]
        let mode = {
            use std::os::unix::fs::PermissionsExt;
            Some(metadata.permissions().mode() & 0o777)
        };
        #[cfg(not(unix))]
        let mode = None;

        on_progress(TransferProgress::new(0, size));

        write_put_request(
            &mut stream,
            &crate::transport::transfer::PutRequest {
                path: remote.display().to_string(),
                size,
                mode,
            },
        )
        .await
        .map_err(TransportError::from)?;

        match read_next_frame(&mut stream)
            .await
            .map_err(TransportError::from)?
        {
            TransferFrame::PutReady(TransferReady {
                size: remote_size, ..
            }) => {
                if remote_size != size {
                    return Err(ClientError::UploadFailed {
                        details: format!(
                            "remote acknowledged unexpected size {remote_size}, expected {size}"
                        ),
                    }
                    .into());
                }
            }
            TransferFrame::Error(details) => {
                return Err(ClientError::TransferRejected {
                    details: details.to_string(),
                }
                .into());
            }
            other => {
                return Err(ClientError::UploadFailed {
                    details: format!("unexpected preflight frame: {other:?}"),
                }
                .into());
            }
        }

        let mut sent = 0u64;
        let mut buffer = vec![0u8; MAX_CHUNK_BYTES];
        loop {
            let count = file
                .read(&mut buffer)
                .await
                .map_err(|source| ClientError::FileIo {
                    operation: "read local source file",
                    path: local.to_path_buf(),
                    source,
                })?;
            if count == 0 {
                break;
            }
            sent += count as u64;
            write_put_chunk(&mut stream, &buffer[..count])
                .await
                .map_err(TransportError::from)?;
            on_progress(TransferProgress::new(sent, size));
        }

        write_put_complete(
            &mut stream,
            &crate::transport::transfer::TransferComplete { size: sent },
        )
        .await
        .map_err(TransportError::from)?;

        match read_next_frame(&mut stream)
            .await
            .map_err(TransportError::from)?
        {
            TransferFrame::PutComplete(complete) if complete.size == sent => Ok(()),
            TransferFrame::PutComplete(complete) => Err(ClientError::UploadFailed {
                details: format!(
                    "remote reported {} bytes saved, expected {sent}",
                    complete.size
                ),
            }
            .into()),
            TransferFrame::Error(details) => Err(ClientError::TransferRejected {
                details: details.to_string(),
            }
            .into()),
            other => Err(ClientError::UploadFailed {
                details: format!("unexpected completion frame: {other:?}"),
            }
            .into()),
        }
    }
}