use std::os::fd::RawFd;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, Receiver};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use block2::RcBlock;
use dispatch2::{DispatchQueue, DispatchRetained, DispatchTime};
use objc2::rc::Retained;
use objc2::runtime::ProtocolObject;
use objc2::AllocAnyThread;
use objc2_foundation::NSError;
use objc2_virtualization::{
VZVirtioSocketDevice, VZVirtioSocketListener, VZVirtualMachine, VZVirtualMachineDelegate,
};
use crate::desktop::{self, Action, ResponseHeader};
use crate::error::Error;
use crate::vz::config::{build as build_vz_config, resolve_vsock_port};
use crate::vz::delegate::{DelegateState, VmetteDelegate};
use crate::vz::vsock::{ListenerMode, ListenerState, VsockLogger};
use crate::{cmdline, Config, ShareMount, WorkloadStrategy};
const AGENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
const AGENT_READ_TIMEOUT: Duration = Duration::from_secs(30);
struct QueueBound<T>(Retained<T>);
unsafe impl<T> Send for QueueBound<T> {}
unsafe impl<T> Sync for QueueBound<T> {}
impl<T> std::ops::Deref for QueueBound<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
#[derive(Debug, Clone)]
pub enum SessionEnd {
Exited(i32),
TimedOut,
Stopped,
Error(String),
}
pub(crate) struct EndSlot {
end: Mutex<Option<SessionEnd>>,
cv: Condvar,
}
impl EndSlot {
fn new() -> Arc<Self> {
Arc::new(Self {
end: Mutex::new(None),
cv: Condvar::new(),
})
}
pub(crate) fn set(&self, e: SessionEnd) {
let mut g = self.end.lock().unwrap();
if g.is_none() {
*g = Some(e);
}
self.cv.notify_all();
}
fn wait_end(&self) -> SessionEnd {
let mut g = self.end.lock().unwrap();
while g.is_none() {
g = self.cv.wait(g).unwrap();
}
g.clone().unwrap()
}
fn is_set(&self) -> bool {
self.end.lock().unwrap().is_some()
}
}
pub(crate) struct AgentConn {
workload: WorkloadStrategy,
rx: Mutex<Option<Receiver<RawFd>>>,
fd: Mutex<Option<RawFd>>,
io: Mutex<()>,
}
impl AgentConn {
fn request(&self, action: &Action) -> Result<(ResponseHeader, Vec<u8>), Error> {
if self.workload != WorkloadStrategy::Agent {
return Err(Error::Vsock(
"request() is only valid for Agent-workload sessions".into(),
));
}
let fd = self.fd()?;
let _io = self.io.lock().unwrap();
let mut stream = FdStream(fd);
let outcome = desktop::send_action(&mut stream, action)
.and_then(|()| desktop::read_response(&mut stream));
match outcome {
Ok((header, payload)) => Ok((header, payload)),
Err(e) => {
self.invalidate_fd();
Err(e.into())
}
}
}
fn invalidate_fd(&self) {
if let Some(fd) = self.fd.lock().unwrap().take() {
unsafe { libc::close(fd) };
}
}
fn fd(&self) -> Result<RawFd, Error> {
let mut cached = self.fd.lock().unwrap();
if let Some(fd) = *cached {
return Ok(fd);
}
let rx_guard = self.rx.lock().unwrap();
let rx = rx_guard
.as_ref()
.ok_or_else(|| Error::Vsock("no agent vsock channel (vsock disabled?)".into()))?;
let fd = rx
.recv_timeout(AGENT_CONNECT_TIMEOUT)
.map_err(|_| Error::Vsock("timed out waiting for the guest agent to connect".into()))?;
set_recv_timeout(fd, AGENT_READ_TIMEOUT);
*cached = Some(fd);
Ok(fd)
}
}
fn set_recv_timeout(fd: RawFd, dur: Duration) {
let tv = libc::timeval {
tv_sec: dur.as_secs() as libc::time_t,
tv_usec: dur.subsec_micros() as libc::suseconds_t,
};
unsafe {
libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVTIMEO,
&tv as *const _ as *const libc::c_void,
std::mem::size_of::<libc::timeval>() as libc::socklen_t,
);
}
}
impl Drop for AgentConn {
fn drop(&mut self) {
if let Some(fd) = *self.fd.lock().unwrap() {
unsafe { libc::close(fd) };
}
if let Some(rx) = self.rx.lock().unwrap().take() {
while let Ok(fd) = rx.try_recv() {
unsafe { libc::close(fd) };
}
}
}
}
fn issue_stop(queue: &DispatchQueue, vm: &Retained<VZVirtualMachine>, end: &Arc<EndSlot>) {
if end.is_set() {
return;
}
let vm_for_stop = QueueBound(vm.clone());
let end_for_stop = end.clone();
queue.exec_async(move || {
let stop_cb = RcBlock::new(move |_err: *mut NSError| {
end_for_stop.set(SessionEnd::Stopped);
});
unsafe { vm_for_stop.stopWithCompletionHandler(&stop_cb) };
});
}
struct ControlDirGuard(PathBuf);
impl Drop for ControlDirGuard {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
fn make_control_dir() -> Result<PathBuf, Error> {
static SEQ: AtomicU64 = AtomicU64::new(0);
let n = SEQ.fetch_add(1, Ordering::Relaxed);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let dir =
std::env::temp_dir().join(format!("vmette-ctl-{}-{}-{}", std::process::id(), n, nanos));
std::fs::create_dir_all(&dir).map_err(Error::Io)?;
Ok(dir)
}
pub struct Session {
vm: Retained<VZVirtualMachine>,
queue: DispatchRetained<DispatchQueue>,
end: Arc<EndSlot>,
vsock_port: Option<u32>,
cmdline: String,
agent: Arc<AgentConn>,
_delegate: Retained<VmetteDelegate>,
_vsock_keepalive: Option<(Retained<VsockLogger>, Retained<VZVirtioSocketListener>)>,
_control_dir: Option<ControlDirGuard>,
}
impl Session {
pub fn start(config: &Config) -> Result<Session, Error> {
let vsock_port = resolve_vsock_port(config.vsock_port);
let mut working = config.clone();
let mut control_dir: Option<ControlDirGuard> = None;
let needs_ctl = config.rootfs_block.is_some()
|| config.rootfs_share.as_ref().is_some_and(|rs| !rs.read_only);
let exit_code_file = if needs_ctl {
if config.shares.iter().any(|s| s.tag == "ctl") {
return Err(Error::InvalidConfig(
"share tag \"ctl\" is reserved for the rootfs exit channel".into(),
));
}
let dir = make_control_dir()?;
let p = dir.join(".vmette-exit");
let _ = std::fs::remove_file(&p);
working.shares.push(ShareMount {
tag: "ctl".into(),
path: dir.clone(),
});
control_dir = Some(ControlDirGuard(dir));
Some(p)
} else {
None
};
let cmdline = cmdline::build(&working, vsock_port);
let cfg = build_vz_config(&working, &cmdline, vsock_port)?;
let queue = DispatchQueue::new("com.chamuka.vmette.session", None);
let vm = unsafe {
VZVirtualMachine::initWithConfiguration_queue(VZVirtualMachine::alloc(), &cfg, &queue)
};
let end = EndSlot::new();
let timed_out = Arc::new(AtomicBool::new(false));
let delegate = VmetteDelegate::new(DelegateState {
exit_code_file,
timed_out: timed_out.clone(),
end: end.clone(),
});
let mut agent_rx = None;
let vsock_keepalive = if let Some(port) = vsock_port {
let mode = match config.workload {
WorkloadStrategy::Agent => {
let (tx, rx) = sync_channel::<RawFd>(1);
agent_rx = Some(rx);
ListenerMode::Agent {
fd_tx: Mutex::new(Some(tx)),
}
}
WorkloadStrategy::OneShot => ListenerMode::Echo {
ready_handler: Arc::new(Mutex::new(None)),
},
};
let logger = VsockLogger::new(ListenerState { port, mode });
let listener = unsafe { VZVirtioSocketListener::new() };
unsafe {
listener.setDelegate(Some(ProtocolObject::from_ref(&*logger)));
}
Some((logger, listener))
} else {
None
};
let agent = Arc::new(AgentConn {
workload: config.workload,
rx: Mutex::new(agent_rx),
fd: Mutex::new(None),
io: Mutex::new(()),
});
let setup_vm = QueueBound(vm.clone());
let setup_delegate = QueueBound(delegate.clone());
let setup_listener = vsock_keepalive
.as_ref()
.map(|(_, l)| (QueueBound(l.clone()), vsock_port.unwrap_or(0)));
queue.exec_sync(move || unsafe {
let proto: &ProtocolObject<dyn VZVirtualMachineDelegate> =
ProtocolObject::from_ref(&*setup_delegate.0);
setup_vm.setDelegate(Some(proto));
if let Some((listener, port)) = &setup_listener {
let sock_dev = setup_vm.socketDevices();
if let Some(dev) = sock_dev.firstObject() {
let dev: Retained<VZVirtioSocketDevice> = Retained::cast_unchecked(dev);
dev.setSocketListener_forPort(listener, *port);
}
}
});
if let Some(secs) = config.timeout_seconds {
let vm_for_timer = QueueBound(vm.clone());
let timed_out_setter = timed_out.clone();
let end_for_timer = end.clone();
let when = DispatchTime::try_from(Duration::from_secs(secs as u64))
.unwrap_or(DispatchTime::NOW);
let _ = queue.after(when, move || {
timed_out_setter.store(true, Ordering::SeqCst);
let end_for_stop = end_for_timer.clone();
let stop_cb = RcBlock::new(move |_err: *mut NSError| {
end_for_stop.set(SessionEnd::TimedOut);
});
unsafe { vm_for_timer.stopWithCompletionHandler(&stop_cb) };
});
}
let vm_for_start = QueueBound(vm.clone());
let end_for_start = end.clone();
queue.exec_async(move || {
let start_cb = RcBlock::new(move |err: *mut NSError| {
if !err.is_null() {
let err = unsafe { &*err };
end_for_start.set(SessionEnd::Error(format!(
"vm.start failed: {}",
err.localizedDescription()
)));
}
});
unsafe { vm_for_start.startWithCompletionHandler(&start_cb) };
});
Ok(Session {
vm,
queue,
end,
vsock_port,
cmdline,
agent,
_delegate: delegate,
_vsock_keepalive: vsock_keepalive,
_control_dir: control_dir,
})
}
pub fn vsock_port(&self) -> Option<u32> {
self.vsock_port
}
pub fn cmdline(&self) -> &str {
&self.cmdline
}
pub fn wait(&self) -> SessionEnd {
self.end.wait_end()
}
pub fn stop(&self) {
issue_stop(&self.queue, &self.vm, &self.end);
}
pub fn request(&self, action: &Action) -> Result<(ResponseHeader, Vec<u8>), Error> {
self.agent.request(action)
}
pub fn client(&self) -> SessionClient {
SessionClient {
agent: self.agent.clone(),
}
}
pub fn stop_handle(&self) -> StopHandle {
StopHandle {
vm: QueueBound(self.vm.clone()),
queue: self.queue.clone(),
end: self.end.clone(),
}
}
}
#[derive(Clone)]
pub struct SessionClient {
agent: Arc<AgentConn>,
}
impl SessionClient {
pub fn request(&self, action: &Action) -> Result<(ResponseHeader, Vec<u8>), Error> {
self.agent.request(action)
}
}
pub struct StopHandle {
vm: QueueBound<VZVirtualMachine>,
queue: DispatchRetained<DispatchQueue>,
end: Arc<EndSlot>,
}
impl StopHandle {
pub fn stop(&self) {
issue_stop(&self.queue, &self.vm.0, &self.end);
}
}
struct FdStream(RawFd);
impl std::io::Read for FdStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let n = unsafe { libc::read(self.0, buf.as_mut_ptr() as *mut _, buf.len()) };
if n < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(n as usize)
}
}
}
impl std::io::Write for FdStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let n = unsafe { libc::write(self.0, buf.as_ptr() as *const _, buf.len()) };
if n < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(n as usize)
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}