remowt-client 0.1.2

russh-based client connection to a remowt agent
Documentation
use std::io;

use bifrostlink::Port;
use bytes::{Bytes, BytesMut};
use russh::{Channel, ChannelStream};
use russh::client::Msg;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, ReadHalf, WriteHalf};
use tokio::join;
use tracing::error;

async fn read(srx: &mut ReadHalf<ChannelStream<Msg>>) -> io::Result<BytesMut> {
	let len = srx.read_u32().await?;
	let mut buf = BytesMut::zeroed(len as usize);
	srx.read_exact(&mut buf).await?;
	Ok(buf)
}
async fn write(stx: &mut WriteHalf<ChannelStream<Msg>>, value: Bytes) -> io::Result<()> {
	stx.write_u32(value.len().try_into().expect("can't be larger"))
		.await?;
	stx.write_all(&value).await?;
	Ok(())
}

pub fn channel_port(ch: Channel<Msg>) -> Port {
	Port::new(move |mut rx, tx| async move {
		let (mut srx, mut stx) = tokio::io::split(ch.into_stream());
		let srx_task = async move {
			loop {
				match read(&mut srx).await {
					Ok(buf) => {
						if tx.send(buf.freeze()).is_err() {
							break;
						}
					}
					Err(e) => {
						error!("channel read failed: {e}");
						break;
					}
				}
			}
		};
		let stx_task = async move {
			while let Some(value) = rx.recv().await {
				if let Err(e) = write(&mut stx, value).await {
					error!("channel write failed: {e}");
					break;
				}
			}
		};
		join!(srx_task, stx_task);
	})
}