use anyhow::{bail, Context, Result};
use clap::Args;
use std::io::{self, Read};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Args, Debug)]
pub struct SendArgs {
#[arg(long, short = 'n', value_name = "NAME")]
pub name: String,
#[arg(value_name = "MESSAGE")]
pub message: Option<String>,
}
pub async fn execute(args: &SendArgs) -> Result<()> {
let message = match &args.message {
Some(m) => m.trim().to_string(),
None => {
if io::IsTerminal::is_terminal(&io::stdin()) {
bail!("message must not be empty (pass as argument or pipe via stdin)");
}
let mut buf = String::new();
io::stdin()
.read_to_string(&mut buf)
.context("failed to read message from stdin")?;
buf.trim().to_string()
}
};
if message.is_empty() {
bail!("message must not be empty (pass as argument or pipe via stdin)");
}
send_message(&args.name, &message).await?;
println!("Sent to session '{}'.", args.name);
Ok(())
}
#[cfg(unix)]
async fn send_message(session_name: &str, message: &str) -> Result<()> {
use tokio::net::UnixStream;
let sock_path = octomind::directories::get_run_dir()
.context("failed to resolve run directory")?
.join(format!("{}.sock", session_name));
if !sock_path.exists() {
bail!(
"no running session named '{}' (socket not found at {:?})",
session_name,
sock_path
);
}
let mut stream = UnixStream::connect(&sock_path)
.await
.with_context(|| format!("failed to connect to session '{}'", session_name))?;
stream
.write_all(message.as_bytes())
.await
.context("failed to send message")?;
stream
.shutdown()
.await
.context("failed to shut down write half")?;
read_response(&mut stream, session_name).await
}
#[cfg(windows)]
async fn send_message(session_name: &str, message: &str) -> Result<()> {
use std::time::Duration;
use tokio::net::windows::named_pipe::ClientOptions;
const ERROR_PIPE_BUSY: i32 = 231;
const ERROR_FILE_NOT_FOUND: i32 = 2;
let pipe_name = format!(r"\\.\pipe\octomind-{}", session_name);
let mut client = loop {
match ClientOptions::new().open(&pipe_name) {
Ok(c) => break c,
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY) => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) if e.raw_os_error() == Some(ERROR_FILE_NOT_FOUND) => {
bail!(
"no running session named '{}' (named pipe not found: {})",
session_name,
pipe_name
);
}
Err(e) => {
return Err(e)
.with_context(|| format!("failed to connect to session '{}'", session_name));
}
}
};
client
.write_all(message.as_bytes())
.await
.context("failed to send message")?;
client
.shutdown()
.await
.context("failed to shut down write half")?;
read_response(&mut client, session_name).await
}
async fn read_response<S>(stream: &mut S, session_name: &str) -> Result<()>
where
S: AsyncReadExt + Unpin,
{
let mut response = String::new();
stream
.read_to_string(&mut response)
.await
.context("failed to read response")?;
let response = response.trim();
if response == "ok" {
Ok(())
} else {
bail!("session '{}' returned: {}", session_name, response);
}
}