use crate::lair_api::traits::AsLairCodec;
use crate::*;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::StreamExt;
use parking_lot::RwLock;
use std::future::Future;
use std::sync::atomic;
use std::sync::Arc;
pub mod traits {
use super::*;
pub type RawSend = Box<dyn tokio::io::AsyncWrite + 'static + Send + Unpin>;
pub type RawRecv = Box<dyn tokio::io::AsyncRead + 'static + Send + Unpin>;
pub trait AsLairServer: 'static + Send + Sync {
fn accept(
&self,
send: RawSend,
recv: RawRecv,
) -> BoxFuture<'static, LairResult<()>>;
fn store(&self) -> BoxFuture<'static, LairResult<LairStore>>;
}
}
use traits::*;
#[derive(Clone)]
pub struct LairServer(pub Arc<dyn AsLairServer>);
impl LairServer {
pub fn accept<S, R>(
&self,
send: S,
recv: R,
) -> impl Future<Output = LairResult<()>> + 'static + Send
where
S: tokio::io::AsyncWrite + 'static + Send + Unpin,
R: tokio::io::AsyncRead + 'static + Send + Unpin,
{
AsLairServer::accept(&*self.0, Box::new(send), Box::new(recv))
}
pub fn store(
&self,
) -> impl Future<Output = LairResult<LairStore>> + 'static + Send {
AsLairServer::store(&*self.0)
}
}
pub fn spawn_lair_server_task<C>(
config: C,
server_name: Arc<str>,
server_version: Arc<str>,
store_factory: LairStoreFactory,
passphrase: sodoken::BufRead,
) -> impl Future<Output = LairResult<LairServer>> + 'static + Send
where
C: Into<LairServerConfig> + 'static + Send,
{
async move {
let srv = Srv::new(
config.into(),
server_name,
server_version,
store_factory,
passphrase,
)
.await?;
Ok(LairServer(Arc::new(srv)))
}
}
mod priv_srv;
use priv_srv::*;
mod priv_api;
use priv_api::*;
#[derive(Clone)]
pub(crate) struct FallbackCmd {
req: tokio::sync::mpsc::Sender<(
LairApiReqSignByPubKey,
tokio::sync::oneshot::Sender<LairResult<LairApiResSignByPubKey>>,
)>,
child: Arc<tokio::process::Child>,
}
impl FallbackCmd {
pub(crate) async fn new(config: &LairServerConfig) -> LairResult<Self> {
let (program, args) = match &config.signature_fallback {
LairServerSignatureFallback::Command { program, args } => {
(program.clone(), args.clone())
}
oth => {
return Err(format!(
"invalid signature_fallback type: {:?}",
oth,
)
.into());
}
};
let program = dunce::canonicalize(program)?;
let args = args.unwrap_or_else(Vec::new);
let mut child = tokio::process::Command::new(program)
.args(args)
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.spawn()?;
let mut stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let (send, mut recv) = tokio::sync::mpsc::channel::<(
LairApiReqSignByPubKey,
tokio::sync::oneshot::Sender<LairResult<LairApiResSignByPubKey>>,
)>(4096);
use std::collections::HashMap;
struct Inner {
p: HashMap<
Arc<str>,
tokio::sync::oneshot::Sender<
LairResult<LairApiResSignByPubKey>,
>,
>,
}
use parking_lot::Mutex;
let inner = Arc::new(Mutex::new(Inner { p: HashMap::new() }));
let inner2 = inner.clone();
tokio::task::spawn(async move {
while let Some((req, res)) = recv.recv().await {
inner2.lock().p.insert(req.msg_id.clone(), res);
let pub_key = base64::encode_config(
&*req.pub_key.0,
base64::URL_SAFE_NO_PAD,
);
let data = base64::encode(req.data);
let output = format!(
"{}\n",
serde_json::to_string(&serde_json::json!({
"msgId": req.msg_id,
"pubKey": pub_key,
"dataToSign": data,
}))
.unwrap(),
);
use tokio::io::AsyncWriteExt;
if let Err(e) = stdin.write_all(output.as_bytes()).await {
tracing::error!("signature_fallback write error: {:?}", e);
return;
}
}
});
tokio::task::spawn(async move {
use tokio::io::AsyncBufReadExt;
let stdout = tokio::io::BufReader::new(stdout);
let mut lines = stdout.lines();
while let Ok(Some(line)) = lines.next_line().await {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct Res {
msg_id: Arc<str>,
signature: Option<String>,
error: Option<String>,
}
let res: Res = match serde_json::from_str(&line) {
Err(e) => {
tracing::error!(
"signature_fallback read error: {:?}",
e
);
return;
}
Ok(r) => r,
};
let respond = inner.lock().p.remove(&res.msg_id);
if let Some(respond) = respond {
if let Some(error) = res.error {
let _ = respond.send(Err(error.into()));
} else if let Some(signature) = res.signature {
let signature = match base64::decode(&signature) {
Ok(s) => s,
Err(e) => {
tracing::error!(
"signature_fallback read error: {:?}",
e
);
return;
}
};
if signature.len() != 64 {
tracing::error!("signature_fallback read error: invalid signature size");
return;
}
let mut sized_sig = [0; 64];
sized_sig.copy_from_slice(&signature);
let _ = respond.send(Ok(LairApiResSignByPubKey {
msg_id: res.msg_id,
signature: sized_sig.into(),
}));
}
}
}
});
Ok(Self {
req: send,
child: Arc::new(child),
})
}
pub(crate) fn sign_by_pub_key(
&self,
req: LairApiReqSignByPubKey,
) -> impl Future<Output = LairResult<LairApiResSignByPubKey>> + 'static + Send
{
let send = self.req.clone();
async move {
let (s, r) = tokio::sync::oneshot::channel();
send.send((req, s))
.await
.map_err(|_| one_err::OneErr::new("no fallback cmd task"))?;
r.await
.map_err(|_| one_err::OneErr::new("no fallback cmd task"))?
}
}
}