use crate::prelude::*;
use crate::commands::*;
pub struct MgmtChannel<R:Read, W:Write> {
pub read: FrameReader<R>,
pub write: FrameWriter<W>,
}
impl<R,W> Debug for MgmtChannel<R,W> where R: Read, W: Write {
#[throws(fmt::Error)]
fn fmt(&self, f: &mut fmt::Formatter) {
f.write_str("MgmtChannel{...}")?
}
}
pub type ClientMgmtChannel = MgmtChannel<
Box<dyn ReadDebug + Send + 'static>,
Box<dyn WriteDebug + Send + 'static>,
>;
pub trait ReadDebug: Read + Debug { }
pub trait WriteDebug: Write + Debug { }
impl<T> ReadDebug for T where T: Read + Debug { }
impl<T> WriteDebug for T where T: Write + Debug { }
impl ClientMgmtChannel {
#[throws(AE)]
pub fn connect(socket_path: &str) -> Self {
let unix = UnixStream::connect(socket_path)
.with_context(||socket_path.to_owned())
.context("connect to server")?;
let read = unix.try_clone().context("dup the client connection")?;
let write = unix;
MgmtChannel::new_boxed(read, write)
}
pub fn new_boxed<R,W>(read: R, write: W) -> Self
where R: ReadDebug + Send + 'static,
W: WriteDebug + Send + 'static {
MgmtChannel::new_raw(Box::new(read), Box::new(write))
}
}
impl MgmtChannel<TimedFdReader,TimedFdWriter> {
#[throws(AE)]
pub fn new_timed<U>(conn: U) -> Self
where U: IoTryClone + Read + Write + IntoRawFd + Send + 'static,
{
let read = conn.try_clone().context("dup the command stream")?;
let read = TimedFdReader::new(read).context("set up timed reader")?;
let write = TimedFdWriter::new(conn).context("set up timed writerr")?;
MgmtChannel::new_raw(read, write)
}
}
impl<R,W> MgmtChannel<R,W> where R: Read, W: Write + Send {
pub fn new_raw(read: R, write: W) -> Self {
let read = FrameReader::new(read);
let write = FrameWriter::new(write);
MgmtChannel { read, write }
}
pub fn read_inner_mut(&mut self) -> &mut R {
self.read.inner_mut()
}
}
impl ClientMgmtChannel {
pub const PROGRESS: ProgressUpdateMode = PUM::Duplex;
#[throws(AE)]
pub fn cmd_withbulk<U,D>(&mut self, cmd: &MgmtCommand,
up: &mut U, down: &mut D,
progress: &mut dyn termprogress::Reporter)
-> MgmtResponse
where U: Read + Send, D: Write,
{
use MgmtResponse::*;
let mut wbulk = self.write
.write_withbulk().context("start sending command")?
.respond(&cmd).context("send command")?;
let read = &mut self.read;
let (resp, mut rbulk) = crossbeam_utils::thread::scope(|scope| {
let thr = scope.spawn(move |_| {
io::copy(up, &mut wbulk).context("copy")?;
wbulk.finish().context("finish")?;
Ok::<_,AE>(())
});
let (mut resp, mut rbulk) =
read.read_withbulk()
.context("failed to read response")?;
while let MR::Progress(pi) = resp {
resp = (&mut rbulk).read_rmp()?;
progress.report(&pi);
}
let r = thr.join().expect("bulk data upload thread paniced");
if let Err(e) = r {
progress.clear();
warn!("bulk data upload failed: {}", e);
}
Ok::<_,AE>((resp, rbulk))
})
.expect("bulk data upload thread panicked, not reaped")
?;
progress.clear();
match &resp {
Progress(_) => panic!(),
Fine | AccountsList{..} | GamesList{..} |
Libraries(_) | LibraryItems(_) | Bundles{..} | Bundle{..} => { },
SshKeys(..) | SshKeyAdded{..} | ThisConnAuthBy{..} => { },
AlterGame { error: None, .. } => { },
Error { error } => {
Err(error.clone()).context(
format!("got error response to: {:?}",&cmd)
)?;
},
AlterGame { error: Some(error), ref responses } => {
if let MgmtCommand::AlterGame { insns, .. } = &cmd {
if responses.len() < insns.len() {
Err(error.clone())
.with_context(|| format!("{:?}", &insns[responses.len()]))
.context("AlterGame insn failed")?;
}
}
Err(error.clone()).context(format!(
"game alterations failed (maybe partially); response to: {:?}",
&cmd
))?;
}
};
io::copy(&mut rbulk, down).context("copy bulk download")?;
resp
}
#[throws(AE)]
pub fn cmd(&mut self, cmd: &MgmtCommand) -> MgmtResponse {
self.cmd_withbulk(cmd, &mut io::empty(), &mut io::sink(),
&mut termprogress::Null)?
}
pub fn for_game(self, game: InstanceName, how: MgmtGameUpdateMode)
-> MgmtChannelForGame {
MgmtChannelForGame {
chan: self,
game, how
}
}
}
pub trait IoTryClone: Sized {
fn try_clone(&self) -> io::Result<Self>;
}
impl IoTryClone for UnixStream {
fn try_clone(&self) -> io::Result<UnixStream> { self.try_clone() }
}
#[derive(Debug,Deref,DerefMut)]
pub struct MgmtChannelForGame {
#[deref] #[deref_mut] pub chan: ClientMgmtChannel,
pub game: InstanceName,
pub how: MgmtGameUpdateMode,
}
impl MgmtChannelForGame {
#[throws(AE)]
pub fn alter_game(&mut self, insns: Vec<MgmtGameInstruction>,
f: Option<&mut dyn FnMut(&MgmtGameResponse) -> Result<(),AE>>)
-> Vec<MgmtGameResponse> {
let insns_len = insns.len();
let cmd = MgmtCommand::AlterGame {
game: self.game.clone(), how: self.how,
insns
};
let responses = match self.cmd(&cmd)? {
MgmtResponse::AlterGame { error: None, responses }
if responses.len() == insns_len ||
responses.iter().any(|r| matches!(r, MGR::InsnExpanded)) => {
responses
},
wat => Err(anyhow!("unexpected AlterGame response: {:?} => {:?}",
&cmd, &wat))?,
};
if let Some(f) = f {
for response in &responses {
f(response)?;
}
}
responses
}
#[throws(AE)]
pub fn info(&mut self) -> MgmtGameResponseGameInfo {
let resp = self.alter_game(vec![MGI::Info], None)?;
match &resp[..] {
[MGR::Info(info)] => info.clone(),
x => throw!(anyhow!("unexpected response to game Info: {:?}", &x)),
}
}
#[throws(AE)]
pub fn has_player(&mut self, account: &AccountName)
-> Option<(PlayerId, MgmtPlayerInfo)>
{
let players = {
let MgmtGameResponseGameInfo { players, .. } = self.info()?;
players
};
players.into_iter().find(
|(_,mpi)| &mpi.account == account
)
}
#[throws(AE)]
pub fn list_pieces(&mut self) -> (Vec<MgmtGamePieceInfo>, BTreeSet<String>) {
let insns = vec![ MGI::ListPieces ];
let mut responses = self.alter_game(insns, None)?;
match responses.as_mut_slice() {
[MGR::Pieces { pieces, pcaliases }] => return (
mem::take(pieces),
mem::take(pcaliases),
),
wat => Err(anyhow!("ListPieces => {:?}", &wat))?,
}
}
#[throws(AE)]
pub fn list_items(&mut self, lib: Option<String>, pat: String)
-> Vec<ItemEnquiryData> {
let cmd = MgmtCommand::LibraryListByGlob {
game: self.game.clone(),
lib, pat,
};
let mut items = match self.cmd(&cmd)? {
MgmtResponse::LibraryItems(items) => items,
wat => Err(anyhow!("unexpected LibraryListByGlob response: {:?}",
&wat))?,
};
items.sort();
items
}
}