pub mod command;
use super::peer::AllowedIP;
use super::{Connection, Device, Reconfigure};
use crate::device::{DeviceTransports, PeerUpdateRequest};
use crate::serialization::KeyBytes;
use command::{Get, GetPeer, GetResponse, Peer, Request, Response, Set, SetPeer, SetResponse};
use eyre::{Context, bail, eyre};
use libc::EINVAL;
use std::fmt::Debug;
use std::io::{BufRead, BufReader, Read, Write};
use std::str::FromStr;
use std::sync::{Weak, atomic};
use tokio::sync::{RwLock, mpsc, oneshot};
#[cfg(unix)]
const SOCK_DIR: &str = "/var/run/wireguard/";
pub struct ApiServer {
rx: mpsc::Receiver<(Request, oneshot::Sender<Response>)>,
}
#[derive(Clone)]
pub struct ApiClient {
tx: mpsc::Sender<(Request, oneshot::Sender<Response>)>,
}
impl ApiClient {
pub async fn send(&self, request: impl Into<Request>) -> eyre::Result<Response> {
let request = request.into();
log::trace!("Handling API request: {request:?}");
let (response_tx, response_rx) = oneshot::channel();
self.tx
.send((request, response_tx))
.await
.map_err(|_| eyre!("Channel closed"))?;
response_rx
.await
.inspect(|response| log::trace!("Sending API response: {response:?}"))
.map_err(|_| eyre!("Channel closed"))
}
pub fn send_sync(&self, request: impl Into<Request>) -> eyre::Result<Response> {
let request = request.into();
log::trace!("Handling API request: {request:?}");
let (response_tx, response_rx) = oneshot::channel();
self.tx
.blocking_send((request, response_tx))
.map_err(|_| eyre!("Channel closed"))?;
response_rx
.blocking_recv()
.inspect(|response| log::trace!("Sending API response: {response:?}"))
.map_err(|_| eyre!("Channel closed"))
}
}
impl ApiClient {
pub fn wrap_read_write<RW>(self, rw: RW)
where
for<'a> &'a RW: Read + Write,
RW: Send + Sync + 'static,
{
std::thread::spawn(move || {
let r = BufReader::new(&rw);
let make_request = |s: &str| {
let request = Request::from_str(s).wrap_err("Failed to parse command")?;
let Some(response) = self.send_sync(request).ok() else {
bail!("Server hung up");
};
log::info!("{:?}", response.to_string());
if let Err(e) = writeln!(&rw, "{response}") {
log::error!("Failed to write API response: {e}");
}
Ok(())
};
let mut lines = String::new();
for line in r.lines() {
let Ok(line) = line else {
if !lines.is_empty()
&& let Err(e) = make_request(&lines)
{
log::error!("Failed to handle UAPI request: {e:#}");
return;
}
return;
};
if !line.is_empty() {
lines.push_str(&line);
lines.push('\n');
continue;
}
if lines.is_empty() {
continue;
}
if let Err(e) = make_request(&lines) {
log::error!("Failed to handle UAPI request: {e:#}");
return;
}
lines.clear();
}
});
}
}
impl ApiServer {
pub fn new() -> (ApiClient, ApiServer) {
let (tx, rx) = mpsc::channel(100);
(ApiClient { tx }, ApiServer { rx })
}
#[cfg(unix)]
pub fn default_unix_socket(name: &str) -> eyre::Result<Self> {
use std::os::unix::net::UnixListener;
let path = format!("{SOCK_DIR}/{name}.sock");
create_sock_dir();
let _ = std::fs::remove_file(&path);
let api_listener =
UnixListener::bind(&path).map_err(|e| eyre!("Failed to bidd unix socket: {e}"))?;
let (tx, rx) = ApiServer::new();
std::thread::spawn(move || {
loop {
let Ok((stream, _)) = api_listener.accept() else {
break;
};
log::info!("New UAPI connection on unix socket");
tx.clone().wrap_read_write(stream);
}
});
Ok(rx)
}
pub fn from_read_write<RW>(rw: RW) -> Self
where
RW: Send + Sync + 'static,
for<'a> &'a RW: Read + Write,
{
let (tx, rx) = Self::new();
tx.wrap_read_write(rw);
rx
}
pub(crate) async fn recv(&mut self) -> Option<(Request, oneshot::Sender<Response>)> {
let (request, response_tx) = self.rx.recv().await?;
Some((request, response_tx))
}
}
impl Debug for ApiServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ApiServer").finish()
}
}
#[cfg(unix)]
fn create_sock_dir() {
use super::drop_privileges::get_saved_ids;
let _ = std::fs::create_dir(SOCK_DIR);
if let Ok((saved_uid, saved_gid)) = get_saved_ids() {
unsafe {
let c_path = std::ffi::CString::new(SOCK_DIR).unwrap();
libc::chown(
c_path.as_bytes_with_nul().as_ptr().cast(),
saved_uid,
saved_gid,
);
}
}
}
impl<T: DeviceTransports> Device<T> {
pub(super) async fn handle_api(device: Weak<RwLock<Self>>, mut api: ApiServer) {
loop {
let Some((request, respond)) = api.recv().await else {
return;
};
let Some(device) = device.upgrade() else {
return;
};
let response = match request {
Request::Get(get) => {
let device_guard = device.read().await;
Response::Get(on_api_get(get, &device_guard).await)
}
Request::Set(set) => {
let mut device_guard = device.write().await;
let (response, reconfigure) = on_api_set(set, &mut device_guard).await;
drop(device_guard);
if reconfigure == Reconfigure::Yes {
match Connection::set_up(device.clone()).await {
Ok(con) => {
let mut device_guard = device.write().await;
device_guard.connection = Some(con);
Response::Set(response)
}
Err(err) => {
log::error!("Failed to set up stuff: {err}");
Response::Set(SetResponse { errno: EINVAL })
}
}
} else {
Response::Set(response)
}
} };
let _ = respond.send(response);
}
}
}
async fn on_api_get(_: Get, d: &Device<impl DeviceTransports>) -> GetResponse {
let mut peers = vec![];
for (public_key, peer) in &d.peers {
let peer = peer.lock().await;
let (_, tx_bytes, rx_bytes, ..) = peer.tunnel.stats();
let endpoint = peer.endpoint().addr;
let padding_overhead = peer.daita.as_ref().map(|daita| daita.padding_overhead());
peers.push(GetPeer {
peer: Peer {
public_key: KeyBytes(*public_key.as_bytes()),
preshared_key: None, endpoint,
persistent_keepalive_interval: peer.persistent_keepalive(),
allowed_ip: peer
.allowed_ips()
.map(|(addr, cidr)| AllowedIP { addr, cidr })
.collect(),
},
last_handshake_time_sec: peer.time_since_last_handshake().map(|d| d.as_secs()),
last_handshake_time_nsec: peer.time_since_last_handshake().map(|d| d.subsec_nanos()),
rx_bytes: Some(rx_bytes as u64),
tx_bytes: Some(tx_bytes as u64),
tx_padding_bytes: padding_overhead.map(|p| p.tx_padding_bytes as u64),
tx_padding_packet_bytes: padding_overhead
.map(|p| p.tx_padding_packet_bytes.load(atomic::Ordering::SeqCst) as u64),
rx_padding_bytes: padding_overhead.map(|p| p.rx_padding_bytes as u64),
rx_padding_packet_bytes: padding_overhead.map(|p| p.rx_padding_packet_bytes as u64),
});
}
GetResponse {
private_key: d.key_pair.as_ref().map(|k| KeyBytes(k.0.to_bytes())),
listen_port: Some(
d.connection
.as_ref()
.and_then(|con| con.listen_port)
.unwrap_or(0),
),
fwmark: d.fwmark,
peers,
errno: 0,
}
}
async fn on_api_set(
set: Set,
device: &mut Device<impl DeviceTransports>,
) -> (SetResponse, Reconfigure) {
let Set {
private_key,
listen_port,
fwmark,
replace_peers,
protocol_version,
peers,
} = set;
if let Some(protocol_version) = protocol_version
&& protocol_version != "1"
{
log::warn!("Invalid API protocol version: {protocol_version}");
return (SetResponse { errno: EINVAL }, Reconfigure::No);
}
let mut reconfigure: Reconfigure = Reconfigure::No;
if replace_peers {
device.clear_peers();
}
if let Some(private_key) = private_key {
reconfigure |= device
.set_key(x25519_dalek::StaticSecret::from(private_key.0))
.await;
}
if let Some(listen_port) = listen_port {
reconfigure |= device.set_port(listen_port);
}
if let Some(fwmark) = fwmark {
#[cfg(target_os = "linux")]
if device.set_fwmark(fwmark).is_err() {
return (
SetResponse {
errno: libc::EADDRINUSE,
},
reconfigure,
);
}
#[cfg(not(target_os = "linux"))]
let _ = fwmark;
}
let mut pending_peer_updates = vec![];
for peer in peers {
let SetPeer {
peer:
Peer {
public_key,
preshared_key,
endpoint,
persistent_keepalive_interval,
allowed_ip,
},
remove,
update_only,
replace_allowed_ips,
daita_settings,
} = peer;
let public_key = x25519_dalek::PublicKey::from(public_key.0);
if update_only && !device.peers.contains_key(&public_key) {
continue;
}
let preshared_key = preshared_key.map(|psk| match psk {
command::SetUnset::Set(psk) => psk.0,
command::SetUnset::Unset => todo!("not sure how to handle this"),
});
let daita_settings = match daita_settings {
Some(daita_settings) => {
reconfigure |= Reconfigure::Yes;
match crate::device::daita::DaitaSettings::try_from(daita_settings) {
Ok(settings) => Some(settings),
Err(e) => {
log::error!("Invalid DAITA settings: {e}");
return (SetResponse { errno: EINVAL }, Reconfigure::No);
}
}
}
None => None,
};
let update_peer = PeerUpdateRequest {
public_key,
remove,
replace_allowed_ips,
endpoint,
new_allowed_ips: allowed_ip,
keepalive: persistent_keepalive_interval,
preshared_key,
daita_settings,
};
pending_peer_updates.push(update_peer);
}
for update_peer in pending_peer_updates {
device.update_peer(update_peer).await;
}
(SetResponse { errno: 0 }, reconfigure)
}