use super::{
error::{AskError, ExecAskError, FileAskError, TellError},
file::RemoteFile,
proc::RemoteProc,
state::ClientState,
};
use crate::{
event::{AddrEventManager, EventManager},
msg::{content::*, Msg},
};
use log::{error, trace};
use over_there_utils::Either;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::{
sync::{oneshot, Mutex},
task::{JoinError, JoinHandle},
};
pub struct ConnectedClient {
pub(super) state: Arc<Mutex<ClientState>>,
pub(super) event_manager: Either<EventManager, AddrEventManager>,
pub(super) event_handle: JoinHandle<()>,
pub(super) remote_addr: SocketAddr,
pub timeout: Duration,
}
impl ConnectedClient {
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
pub async fn wait(self) -> Result<(), JoinError> {
match self.event_manager {
Either::Left(m) => {
tokio::try_join!(m.wait(), self.event_handle).map(|_| ())
}
Either::Right(m) => {
tokio::try_join!(m.wait(), self.event_handle).map(|_| ())
}
}
}
pub async fn ask(&mut self, msg: Msg) -> Result<Msg, AskError> {
let timeout = self.timeout;
let (tx, rx) = oneshot::channel::<Result<Msg, AskError>>();
self.state.lock().await.callback_manager.add_callback(
msg.header.id,
|msg| {
let result = if let Content::Error(args) = &msg.content {
tx.send(Err(AskError::Failure {
msg: args.msg.to_string(),
}))
} else {
tx.send(Ok(msg.clone()))
};
if result.is_err() {
error!("Failed to trigger callback: {:?}", msg);
}
},
);
self.tell(msg).await.map_err(AskError::from)?;
tokio::time::timeout(timeout, rx)
.await
.map_err(|_| AskError::Timeout)?
.map_err(|_| AskError::CallbackLost)?
}
pub async fn tell(&mut self, msg: Msg) -> Result<(), TellError> {
trace!("Sending to {}: {:?}", self.remote_addr, msg);
let data = msg.to_vec().map_err(|_| TellError::EncodingFailed)?;
match &mut self.event_manager {
Either::Left(m) => {
m.send(data).await.map_err(|_| TellError::SendFailed)
}
Either::Right(m) => m
.send_to(data, self.remote_addr)
.await
.map_err(|_| TellError::SendFailed),
}
}
pub async fn ask_version(&mut self) -> Result<VersionArgs, AskError> {
let msg = self.ask(Msg::from(Content::DoGetVersion)).await?;
match msg.content {
Content::Version(args) => Ok(args),
x => Err(make_ask_error(x)),
}
}
pub async fn ask_capabilities(
&mut self,
) -> Result<CapabilitiesArgs, AskError> {
let msg = self.ask(Msg::from(Content::DoGetCapabilities)).await?;
match msg.content {
Content::Capabilities(args) => Ok(args),
x => Err(make_ask_error(x)),
}
}
pub async fn ask_create_dir(
&mut self,
path: String,
include_components: bool,
) -> Result<DirCreatedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoCreateDir(DoCreateDirArgs {
path,
include_components,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::DirCreated(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_rename_dir(
&mut self,
from: String,
to: String,
) -> Result<DirRenamedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoRenameDir(DoRenameDirArgs {
from,
to,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::DirRenamed(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_remove_dir(
&mut self,
path: String,
non_empty: bool,
) -> Result<DirRemovedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoRemoveDir(DoRemoveDirArgs {
path,
non_empty,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::DirRemoved(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_list_root_dir_contents(
&mut self,
) -> Result<DirContentsListArgs, FileAskError> {
self.ask_list_dir_contents(String::from(".")).await
}
pub async fn ask_list_dir_contents(
&mut self,
path: String,
) -> Result<DirContentsListArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoListDirContents(
DoListDirContentsArgs { path },
)))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::DirContentsList(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_open_file(
&mut self,
path: String,
) -> Result<FileOpenedArgs, FileAskError> {
self.ask_open_file_with_options(path, true, true, true)
.await
}
pub async fn ask_open_file_with_options(
&mut self,
path: String,
create: bool,
write: bool,
read: bool,
) -> Result<FileOpenedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoOpenFile(DoOpenFileArgs {
path: path.clone(),
create_if_missing: create,
write_access: write,
read_access: read,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::FileOpened(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_close_file(
&mut self,
file: &RemoteFile,
) -> Result<FileClosedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoCloseFile(DoCloseFileArgs {
id: file.id,
sig: file.sig,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::FileClosed(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_rename_file(
&mut self,
file: &mut RemoteFile,
to: String,
) -> Result<FileRenamedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoRenameFile(DoRenameFileArgs {
id: file.id,
sig: file.sig,
to,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::FileRenamed(args) => {
file.sig = args.sig;
Ok(args)
}
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_rename_unopened_file(
&mut self,
from: String,
to: String,
) -> Result<UnopenedFileRenamedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoRenameUnopenedFile(
DoRenameUnopenedFileArgs { from, to },
)))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::UnopenedFileRenamed(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_remove_file(
&mut self,
file: &mut RemoteFile,
) -> Result<FileRemovedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoRemoveFile(DoRemoveFileArgs {
id: file.id,
sig: file.sig,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::FileRemoved(args) => {
file.sig = args.sig;
Ok(args)
}
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_remove_unopened_file(
&mut self,
path: String,
) -> Result<UnopenedFileRemovedArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoRemoveUnopenedFile(
DoRemoveUnopenedFileArgs { path },
)))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::UnopenedFileRemoved(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_read_file(
&mut self,
file: &RemoteFile,
) -> Result<FileContentsArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoReadFile(DoReadFileArgs {
id: file.id,
sig: file.sig,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::FileContents(args) => Ok(args),
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_write_file(
&mut self,
file: &mut RemoteFile,
contents: &[u8],
) -> Result<FileWrittenArgs, FileAskError> {
let result = self
.ask(Msg::from(Content::DoWriteFile(DoWriteFileArgs {
id: file.id,
sig: file.sig,
contents: contents.to_vec(),
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::FileWritten(args) => {
file.sig = args.sig;
Ok(args)
}
x => Err(make_file_ask_error(x)),
}
}
pub async fn ask_exec_proc(
&mut self,
command: String,
args: Vec<String>,
) -> Result<ProcStartedArgs, ExecAskError> {
self.ask_exec_proc_with_options(command, args, true, true, true, None)
.await
}
pub async fn ask_exec_proc_with_current_dir(
&mut self,
command: String,
args: Vec<String>,
current_dir: String,
) -> Result<ProcStartedArgs, ExecAskError> {
self.ask_exec_proc_with_options(
command,
args,
true,
true,
true,
Some(current_dir),
)
.await
}
pub async fn ask_exec_proc_with_options(
&mut self,
command: String,
args: Vec<String>,
stdin: bool,
stdout: bool,
stderr: bool,
current_dir: Option<String>,
) -> Result<ProcStartedArgs, ExecAskError> {
let result = self
.ask(Msg::from(Content::DoExecProc(DoExecProcArgs {
command,
args,
stdin,
stdout,
stderr,
current_dir,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::ProcStarted(args) => Ok(args),
x => Err(make_exec_ask_error(x)),
}
}
pub async fn ask_write_stdin(
&mut self,
proc: &RemoteProc,
input: &[u8],
) -> Result<StdinWrittenArgs, ExecAskError> {
let result = self
.ask(Msg::from(Content::DoWriteStdin(DoWriteStdinArgs {
id: proc.id,
input: input.to_vec(),
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::StdinWritten(args) => Ok(args),
x => Err(make_exec_ask_error(x)),
}
}
pub async fn ask_get_stdout(
&mut self,
proc: &RemoteProc,
) -> Result<StdoutContentsArgs, ExecAskError> {
let result = self
.ask(Msg::from(Content::DoGetStdout(DoGetStdoutArgs {
id: proc.id,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::StdoutContents(args) => Ok(args),
x => Err(make_exec_ask_error(x)),
}
}
pub async fn ask_get_stderr(
&mut self,
proc: &RemoteProc,
) -> Result<StderrContentsArgs, ExecAskError> {
let result = self
.ask(Msg::from(Content::DoGetStderr(DoGetStderrArgs {
id: proc.id,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::StderrContents(args) => Ok(args),
x => Err(make_exec_ask_error(x)),
}
}
pub async fn ask_proc_status(
&mut self,
proc: &RemoteProc,
) -> Result<ProcStatusArgs, ExecAskError> {
let result = self
.ask(Msg::from(Content::DoGetProcStatus(DoGetProcStatusArgs {
id: proc.id,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::ProcStatus(args) => Ok(args),
x => Err(make_exec_ask_error(x)),
}
}
pub async fn ask_proc_kill(
&mut self,
proc: &RemoteProc,
) -> Result<ProcStatusArgs, ExecAskError> {
let result = self
.ask(Msg::from(Content::DoKillProc(DoKillProcArgs {
id: proc.id,
})))
.await;
if let Err(x) = result {
return Err(From::from(x));
}
match result.unwrap().content {
Content::ProcStatus(args) if args.is_alive => {
Err(ExecAskError::FailedToKill)
}
Content::ProcStatus(args) => Ok(args),
x => Err(make_exec_ask_error(x)),
}
}
pub async fn ask_internal_debug(
&mut self,
) -> Result<InternalDebugArgs, AskError> {
let result = self
.ask(Msg::from(Content::InternalDebug(InternalDebugArgs {
input: vec![],
output: vec![],
})))
.await?;
match result.content {
Content::InternalDebug(args) => Ok(args),
x => Err(make_ask_error(x)),
}
}
}
fn make_file_ask_error(x: Content) -> FileAskError {
match x {
Content::IoError(args) => FileAskError::IoError(args.into()),
x => From::from(make_ask_error(x)),
}
}
fn make_exec_ask_error(x: Content) -> ExecAskError {
match x {
Content::IoError(args) => ExecAskError::IoError(args.into()),
x => From::from(make_ask_error(x)),
}
}
fn make_ask_error(x: Content) -> AskError {
match x {
content => AskError::InvalidResponse { content },
}
}