use crate::bin_error::{self, ContextExt as _};
use futures_util::StreamExt as _;
pub struct Agent {
timer_r: tokio::sync::mpsc::UnboundedReceiver<()>,
sync_timer_r: tokio::sync::mpsc::UnboundedReceiver<()>,
state: std::sync::Arc<tokio::sync::Mutex<crate::state::State>>,
}
impl Agent {
pub fn new(
timer_r: tokio::sync::mpsc::UnboundedReceiver<()>,
sync_timer_r: tokio::sync::mpsc::UnboundedReceiver<()>,
state: std::sync::Arc<tokio::sync::Mutex<crate::state::State>>,
) -> Self {
Self {
timer_r,
sync_timer_r,
state,
}
}
pub async fn run(
self,
listener: tokio::net::UnixListener,
) -> bin_error::Result<()> {
pub enum Event {
Request(std::io::Result<tokio::net::UnixStream>),
Timeout(()),
Sync(()),
}
let notifications = self
.state
.lock()
.await
.notifications_handler
.get_channel()
.await;
let notifications =
tokio_stream::wrappers::UnboundedReceiverStream::new(
notifications,
)
.map(|message| match message {
crate::notifications::Message::Logout => Event::Timeout(()),
crate::notifications::Message::Sync => Event::Sync(()),
})
.boxed();
let mut stream = futures_util::stream::select_all([
tokio_stream::wrappers::UnixListenerStream::new(listener)
.map(Event::Request)
.boxed(),
tokio_stream::wrappers::UnboundedReceiverStream::new(
self.timer_r,
)
.map(Event::Timeout)
.boxed(),
tokio_stream::wrappers::UnboundedReceiverStream::new(
self.sync_timer_r,
)
.map(Event::Sync)
.boxed(),
notifications,
]);
while let Some(event) = stream.next().await {
match event {
Event::Request(res) => {
let stream =
res.context("failed to accept incoming connection")?;
if let Err(e) = crate::sock::check_peer_uid(&stream) {
log::warn!("rejecting connection: {e:#}");
continue;
}
let mut sock = crate::sock::Sock::new(stream);
let state = self.state.clone();
tokio::spawn(async move {
let res =
handle_request(&mut sock, state.clone()).await;
if let Err(e) = res {
sock.send(&bwx::protocol::Response::Error {
error: format!("{e:#}"),
})
.await
.unwrap();
}
});
}
Event::Timeout(()) => {
self.state.lock().await.clear();
}
Event::Sync(()) => {
let state = self.state.clone();
tokio::spawn(async move {
if let Err(e) =
crate::actions::sync(None, state.clone()).await
{
eprintln!("failed to sync: {e:#}");
}
});
self.state.lock().await.set_sync_timeout();
}
}
}
Ok(())
}
}
const RECV_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(30);
async fn handle_request(
sock: &mut crate::sock::Sock,
state: std::sync::Arc<tokio::sync::Mutex<crate::state::State>>,
) -> bin_error::Result<()> {
let req = if let Ok(r) =
tokio::time::timeout(RECV_TIMEOUT, sock.recv()).await
{
r?
} else {
let _ = sock
.send(&bwx::protocol::Response::Error {
error: "request read timed out".to_string(),
})
.await;
return Ok(());
};
let req = match req {
Ok(msg) => msg,
Err(error) => {
sock.send(&bwx::protocol::Response::Error { error }).await?;
return Ok(());
}
};
let (action, environment, session_id, purpose) = req.into_parts();
let set_timeout = match &action {
bwx::protocol::Action::Register => {
crate::actions::register(sock, &environment).await?;
true
}
bwx::protocol::Action::Login => {
crate::actions::login(sock, state.clone(), &environment).await?;
true
}
bwx::protocol::Action::Unlock => {
crate::actions::unlock(sock, state.clone(), &environment).await?;
true
}
bwx::protocol::Action::CheckLock => {
crate::actions::check_lock(sock, state.clone()).await?;
false
}
bwx::protocol::Action::Lock => {
crate::actions::lock(sock, state.clone()).await?;
state.lock().await.clear_touchid_sessions();
false
}
bwx::protocol::Action::Sync => {
crate::actions::sync(Some(sock), state.clone()).await?;
false
}
bwx::protocol::Action::Decrypt {
cipherstring,
entry_key,
org_id,
} => {
let cipherstring = cipherstring.clone();
let entry_key = entry_key.clone();
let org_id = org_id.clone();
crate::actions::decrypt(
sock,
state.clone(),
&environment,
&cipherstring,
entry_key.as_deref(),
org_id.as_deref(),
session_id.as_deref(),
purpose.as_deref(),
)
.await?;
true
}
bwx::protocol::Action::DecryptBatch { items } => {
let items = items.clone();
crate::actions::decrypt_batch(
sock,
state.clone(),
&environment,
items,
session_id.as_deref(),
purpose.as_deref(),
)
.await?;
true
}
bwx::protocol::Action::Encrypt { plaintext, org_id } => {
crate::actions::encrypt(
sock,
state.clone(),
plaintext,
org_id.as_deref(),
session_id.as_deref(),
purpose.as_deref(),
)
.await?;
true
}
bwx::protocol::Action::ClipboardStore { text } => {
crate::actions::clipboard_store(
sock,
state.clone(),
text,
session_id.as_deref(),
purpose.as_deref(),
)
.await?;
true
}
bwx::protocol::Action::Quit => std::process::exit(0),
bwx::protocol::Action::Version => {
crate::actions::version(sock).await?;
false
}
bwx::protocol::Action::TouchIdEnroll => {
crate::actions::touchid_enroll(sock, state.clone()).await?;
true
}
bwx::protocol::Action::TouchIdDisable => {
crate::actions::touchid_disable(sock).await?;
false
}
bwx::protocol::Action::TouchIdStatus => {
crate::actions::touchid_status(sock).await?;
false
}
};
let mut state = state.lock().await;
state.set_last_environment(environment);
if set_timeout {
state.set_timeout();
}
Ok(())
}